You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2020/06/26 15:40:37 UTC

[impala] 02/02: IMPALA-9697: Support priority based scratch directory selection

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3b9ae415e22296683fd905e590c02fe7de3d668c
Author: Abhishek Rawat <ar...@cloudera.com>
AuthorDate: Fri Jun 5 07:24:01 2020 -0700

    IMPALA-9697: Support priority based scratch directory selection
    
    The '--scratch_dirs' configuration option now supports specifying the
    priority of the scratch direcotry. The lower the numeric value, the
    higher is the priority. If priority is not specified then default
    priority with value numeric_limits<int>::max() is used.
    
    Valid formats for specifying the priority are:
    - <dir-path>:<limit>:<priority>
    - <dir-path>::<priority>
    Following formats use default priority:
    - <dir-path>
    - <dir-path>:<limit>
    - <dir-path>:<limit>:
    
    The new logic in TmpFileGroup::AllocateSpace() tries to find a target
    file using a prioritized round-robin scheme. Files are ordered in
    decreasing order of their priority. The priority of a file is same as
    the priority of the related directory. A target file is selected by
    always searching in the ordered list starting from the file with highest
    priority. If multiple files have same priority, then the target file is
    selected in a round robin manner.
    
    Testing:
    - Added unit and e2e tests for priority based spilling logic.
    
    Change-Id: I381c3a358e1382e6696325fec74667f1fa18dd17
    Reviewed-on: http://gerrit.cloudera.org:8080/16091
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/tmp-file-mgr-test.cc       | 191 ++++++++++++++++++++++++++++--
 be/src/runtime/tmp-file-mgr.cc            | 127 +++++++++++++++-----
 be/src/runtime/tmp-file-mgr.h             |  36 ++++--
 tests/custom_cluster/test_scratch_disk.py |  67 +++++++++++
 4 files changed, 371 insertions(+), 50 deletions(-)

diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index fcf581c..0a5ec60 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -64,6 +64,7 @@ static const int64_t TERABYTE = 1024L * GIGABYTE;
 
 class TmpFileMgrTest : public ::testing::Test {
  public:
+  static const int DEFAULT_PRIORITY = numeric_limits<int>::max();
   virtual void SetUp() {
     // Reset query options that are modified by tests.
     FLAGS_disk_spill_encryption = false;
@@ -170,8 +171,8 @@ class TmpFileMgrTest : public ::testing::Test {
   }
 
   /// Helper to set FileGroup::next_allocation_index_.
-  static void SetNextAllocationIndex(TmpFileGroup* group, int value) {
-    group->next_allocation_index_ = value;
+  static void SetNextAllocationIndex(TmpFileGroup* group, int priority, int value) {
+    group->next_allocation_index_[priority] = value;
   }
 
   /// Helper to cancel the FileGroup RequestContext.
@@ -193,6 +194,16 @@ class TmpFileMgrTest : public ::testing::Test {
     return bytes_allocated;
   }
 
+  /// Helper to validate the [start, end] index range for a given priority for a given
+  /// file group.
+  static void ValidatePriorityIndexRange(TmpFileGroup* group, int priority, int start,
+    int end) {
+    auto search = group->tmp_files_index_range_.find(priority);
+    EXPECT_TRUE(search != group->tmp_files_index_range_.end());
+    EXPECT_EQ(start, search->second.start);
+    EXPECT_EQ(end, search->second.end);
+  }
+
   /// Helpers to call WriteHandle methods.
   void Cancel(TmpWriteHandle* handle) { handle->Cancel(); }
   void WaitForWrite(TmpWriteHandle* handle) {
@@ -445,7 +456,7 @@ void TmpFileMgrTest::TestScratchLimit(bool punch_holes, int64_t alloc_size) {
   TmpFile* alloc_file;
 
   // Alloc from file 1 should succeed.
-  SetNextAllocationIndex(&file_group, 0);
+  SetNextAllocationIndex(&file_group, DEFAULT_PRIORITY, 0);
   ASSERT_OK(GroupAllocateSpace(&file_group, alloc_size, &alloc_file, &offset));
   ASSERT_EQ(alloc_file, files[0]); // Should select files round-robin.
   ASSERT_EQ(0, offset);
@@ -697,12 +708,12 @@ TEST_F(TmpFileMgrTest, TestHWMMetric) {
   TmpFile* alloc_file;
 
   // Alloc from file_group_1 and file_group_2 interleaving allocations.
-  SetNextAllocationIndex(&file_group_1, 0);
+  SetNextAllocationIndex(&file_group_1, DEFAULT_PRIORITY, 0);
   ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
   ASSERT_EQ(alloc_file, files[0]);
   ASSERT_EQ(0, offset);
 
-  SetNextAllocationIndex(&file_group_2, 0);
+  SetNextAllocationIndex(&file_group_2, DEFAULT_PRIORITY, 0);
   ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
   ASSERT_EQ(alloc_file, files[2]);
   ASSERT_EQ(0, offset);
@@ -772,7 +783,7 @@ void TmpFileMgrTest::TestDirectoryLimits(bool punch_holes) {
 
   // Allocate three times - once per directory. We expect these allocations to go through
   // so we should have one allocation in each directory.
-  SetNextAllocationIndex(&file_group_1, 0);
+  SetNextAllocationIndex(&file_group_1, DEFAULT_PRIORITY, 0);
   for (int i = 0; i < tmp_dir_specs.size(); ++i) {
     ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
   }
@@ -847,7 +858,7 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitsExhausted) {
   TmpFile* alloc_file;
 
   // Allocate exactly the maximum total capacity of the directories.
-  SetNextAllocationIndex(&file_group_1, 0);
+  SetNextAllocationIndex(&file_group_1, DEFAULT_PRIORITY, 0);
   for (int i = 0; i < MAX_ALLOCATIONS; ++i) {
     ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
   }
@@ -883,9 +894,12 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
       "/tmp/tmp-file-mgr-test6", "/tmp/tmp-file-mgr-test7"});
   // Configure various directories with valid formats.
   auto& dirs = GetTmpDirs(
-      CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:5g,/tmp/tmp-file-mgr-test2,"
-                       "/tmp/tmp-file-mgr-test3:1234,/tmp/tmp-file-mgr-test4:99999999,"
-                       "/tmp/tmp-file-mgr-test5:200tb,/tmp/tmp-file-mgr-test6:100MB"));
+      CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:5g:1,"
+                       "/tmp/tmp-file-mgr-test2::2,"
+                       "/tmp/tmp-file-mgr-test3:1234:3,"
+                       "/tmp/tmp-file-mgr-test4:99999999:4,"
+                       "/tmp/tmp-file-mgr-test5:200tb:5,"
+                       "/tmp/tmp-file-mgr-test6:100MB:6"));
   EXPECT_EQ(6, dirs.size());
   EXPECT_EQ(5 * GIGABYTE, dirs[0].bytes_limit);
   EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1].bytes_limit);
@@ -918,7 +932,7 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
   // Extra colons
   auto& dirs4 = GetTmpDirs(
       CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:1:,/tmp/tmp-file-mgr-test2:10mb::"));
-  EXPECT_EQ(0, dirs4.size());
+  EXPECT_EQ(1, dirs4.size());
 
   // Empty strings.
   auto& nodirs = GetTmpDirs(CreateTmpFileMgr(""));
@@ -1020,4 +1034,159 @@ void TmpFileMgrTest::TestCompressBufferManagement() {
   compressed_buffer_tracker->Release(DATA.size() * 2);
   file_group.Close();
 }
+
+// Test the directory parsing logic, including the various error cases.
+TEST_F(TmpFileMgrTest, TestDirectoryPriorityParsing) {
+  RemoveAndCreateDirs({"/tmp/tmp-file-mgr-test1", "/tmp/tmp-file-mgr-test2",
+      "/tmp/tmp-file-mgr-test3", "/tmp/tmp-file-mgr-test4", "/tmp/tmp-file-mgr-test5",
+      "/tmp/tmp-file-mgr-test6", "/tmp/tmp-file-mgr-test7"});
+  // Configure various directories with valid formats. The directories are passed in
+  // unsorted priority order. The expectation is that the resulting directory lists will
+  // be sorted by priority.
+  auto& dirs = GetTmpDirs(
+      CreateTmpFileMgr("/tmp/tmp-file-mgr-test3:1234:3,/tmp/tmp-file-mgr-test6:100MB:,"
+                       "/tmp/tmp-file-mgr-test2::2,/tmp/tmp-file-mgr-test4:99999999:4,"
+                       "/tmp/tmp-file-mgr-test5:200tb:5,/tmp/tmp-file-mgr-test1:5g:1"));
+  EXPECT_EQ(6, dirs.size());
+  EXPECT_EQ(5 * GIGABYTE, dirs[0].bytes_limit);
+  EXPECT_EQ(1, dirs[0].priority);
+  EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1].bytes_limit);
+  EXPECT_EQ(2, dirs[1].priority);
+  EXPECT_EQ(1234, dirs[2].bytes_limit);
+  EXPECT_EQ(3, dirs[2].priority);
+  EXPECT_EQ(99999999, dirs[3].bytes_limit);
+  EXPECT_EQ(4, dirs[3].priority);
+  EXPECT_EQ(200 * TERABYTE, dirs[4].bytes_limit);
+  EXPECT_EQ(5, dirs[4].priority);
+  EXPECT_EQ(100 * MEGABYTE, dirs[5].bytes_limit);
+  EXPECT_EQ(numeric_limits<int>::max(), dirs[5].priority);
+
+  // Various invalid limit formats result in the directory getting skipped.
+  // Include a valid dir on the end to ensure that we don't short-circuit all
+  // directories.
+  auto& dirs2 = GetTmpDirs(
+      CreateTmpFileMgr("/tmp/tmp-file-mgr-test1::foo,/tmp/tmp-file-mgr-test2::?,"
+                       "/tmp/tmp-file-mgr-test3::1.2.3.4,/tmp/tmp-file-mgr-test4:: ,"
+                       "/tmp/tmp-file-mgr-test5::p0,/tmp/tmp-file-mgr-test6::10%,"
+                       "/tmp/tmp-file-mgr-test1:100:-1"));
+  EXPECT_EQ(1, dirs2.size());
+  EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0].path);
+  EXPECT_EQ(100, dirs2[0].bytes_limit);
+  EXPECT_EQ(-1, dirs2[0].priority);
+}
+
+// Tests that when TmpFileGroup is constructed, the priority based index ranges are
+// populated properly.
+TEST_F(TmpFileMgrTest, TestPriorityBasedIndexRanges) {
+  // Tmp dirs with priority 0, 1 and 2.
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test5::1", "/tmp/tmp-file-mgr-test2::0",
+      "/tmp/tmp-file-mgr-test7::2", "/tmp/tmp-file-mgr-test1::0",
+      "/tmp/tmp-file-mgr-test6::1", "/tmp/tmp-file-mgr-test3::0",
+      "/tmp/tmp-file-mgr-test4::1"});
+  // Tmp dirs with priority 0 and 1.
+  vector<string> tmp_dirs1({"/tmp/tmp-file-mgr-test5::0", "/tmp/tmp-file-mgr-test2::1",
+      "/tmp/tmp-file-mgr-test7::0", "/tmp/tmp-file-mgr-test1::1",
+      "/tmp/tmp-file-mgr-test6::0", "/tmp/tmp-file-mgr-test3::1",
+      "/tmp/tmp-file-mgr-test4::0"});
+  // Tmp dirs with priority 0.
+  vector<string> tmp_dirs2({"/tmp/tmp-file-mgr-test1::0", "/tmp/tmp-file-mgr-test2::0",
+      "/tmp/tmp-file-mgr-test3::0", "/tmp/tmp-file-mgr-test4::0"});
+  TmpFileMgr tmp_file_mgr;
+  TmpFileMgr tmp_file_mgr1;
+  TmpFileMgr tmp_file_mgr2;
+  scoped_ptr<MetricGroup> metrics1(new MetricGroup("tmp-file-mgr-test"));
+  scoped_ptr<MetricGroup> metrics2(new MetricGroup("tmp-file-mgr-test"));
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, "", false, metrics_.get()));
+  ASSERT_OK(tmp_file_mgr1.InitCustom(tmp_dirs1, false, "", false, metrics1.get()));
+  ASSERT_OK(tmp_file_mgr2.InitCustom(tmp_dirs2, false, "", false, metrics2.get()));
+  TUniqueId id;
+  TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
+  ValidatePriorityIndexRange(&file_group, 0, 0, 2);
+  ValidatePriorityIndexRange(&file_group, 1, 3, 5);
+  ValidatePriorityIndexRange(&file_group, 2, 6, 6);
+  TmpFileGroup file_group1(&tmp_file_mgr1, io_mgr(), profile_, id);
+  ValidatePriorityIndexRange(&file_group1, 0, 0, 3);
+  ValidatePriorityIndexRange(&file_group1, 1, 4, 6);
+  TmpFileGroup file_group2(&tmp_file_mgr2, io_mgr(), profile_, id);
+  ValidatePriorityIndexRange(&file_group2, 0, 0, 3);
+  file_group.Close();
+  file_group1.Close();
+  file_group2.Close();
+}
+
+// Tests various scenarios with priority based spilling.
+TEST_F(TmpFileMgrTest, TestPriorityBasedSpilling) {
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test1:1K:0", "/tmp/tmp-file-mgr-test2:1K:3",
+      "/tmp/tmp-file-mgr-test3:1K:2", "/tmp/tmp-file-mgr-test4:1K:2",
+      "/tmp/tmp-file-mgr-test5:1K:2", "/tmp/tmp-file-mgr-test6:1K:1"});
+  RemoveAndCreateDirs(tmp_dirs);
+  TmpFileMgr tmp_file_mgr;
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, "", false, metrics_.get()));
+
+  // The allocation size is same as the limit per directory.
+  // So each scratch directory can fit exactly one allocation.
+  const int alloc_size = 1024;
+  const int64_t limit = alloc_size * 6;
+  TUniqueId id;
+  TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id, limit);
+  TUniqueId id1;
+  TmpFileGroup file_group1(&tmp_file_mgr, io_mgr(), profile_, id1, limit);
+  TUniqueId id2;
+  TmpFileGroup file_group2(&tmp_file_mgr, io_mgr(), profile_, id2, limit);
+
+  vector<TmpFile*> files;
+  ASSERT_OK(CreateFiles(&file_group, &files));
+  vector<TmpFile*> files1;
+  ASSERT_OK(CreateFiles(&file_group1, &files1));
+  vector<TmpFile*> files2;
+  ASSERT_OK(CreateFiles(&file_group2, &files2));
+
+  int64_t offset;
+  TmpFile* alloc_file;
+
+  // filegroup should allocate from file at index 0
+  ASSERT_OK(GroupAllocateSpace(&file_group, alloc_size, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files[0]);
+
+  // filegroup1 should allocate from file at index 1
+  ASSERT_OK(GroupAllocateSpace(&file_group1, alloc_size, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files1[1]);
+
+  SetNextAllocationIndex(&file_group1, 2, 4);
+  // filegroup1 should allocate from file at index 4
+  ASSERT_OK(GroupAllocateSpace(&file_group1, alloc_size, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files1[4]);
+
+  // filegroup1 should allocate from file at index 2
+  ASSERT_OK(GroupAllocateSpace(&file_group1, alloc_size, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files1[2]);
+
+  // filegroup2 should allocate from file at index 3
+  ASSERT_OK(GroupAllocateSpace(&file_group2, alloc_size, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files2[3]);
+
+  // filegroup should allocate from file at index 5
+  ASSERT_OK(GroupAllocateSpace(&file_group, alloc_size, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files[5]);
+
+  // Closing filegroup should result in freeing allocation at index 0 and 5
+  file_group.Close();
+
+  // filegroup1 should allocate from file at index 0
+  ASSERT_OK(GroupAllocateSpace(&file_group1, alloc_size, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files1[0]);
+
+  // Closing filegroup2 should result in freeing allocation at index 3
+  file_group2.Close();
+
+  // filegroup should allocate from file at index 3
+  ASSERT_OK(GroupAllocateSpace(&file_group1, alloc_size, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files1[3]);
+
+  // filegroup should allocate from file at index 5
+  ASSERT_OK(GroupAllocateSpace(&file_group1, alloc_size, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files1[5]);
+
+  file_group1.Close();
+}
 } // namespace impala
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index fd0a48a..86e3dbd 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -49,6 +49,7 @@
 #include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
 #include "util/scope-exit-trigger.h"
+#include "util/string-parser.h"
 
 #include "common/names.h"
 
@@ -74,11 +75,18 @@ DEFINE_bool(disk_spill_punch_holes, false,
 DEFINE_string(scratch_dirs, "/tmp",
     "Writable scratch directories. "
     "This is a comma-separated list of directories. Each directory is "
-    "specified as the directory path and an optional limit on the bytes that will "
-    "be allocated in that directory. If the optional limit is provided, the path and "
+    "specified as the directory path, an optional limit on the bytes that will "
+    "be allocated in that directory, and an optional priority for the directory. "
+    "If the optional limit is provided, the path and "
     "the limit are separated by a colon. E.g. '/dir1:10G,/dir2:5GB,/dir3' will allow "
     "allocating up to 10GB of scratch in /dir1, 5GB of scratch in /dir2 and an "
-    "unlimited amount in /dir3.");
+    "unlimited amount in /dir3. "
+    "If the optional priority is provided, the path and the limit and priority are "
+    "separated by colon. Priority based spilling will result in directories getting "
+    "selected as a spill target based on their priority. The lower the numerical value "
+    "the higher the priority. E.g. '/dir1:10G:0,/dir2:5GB:1,/dir3::1', will cause "
+    "spilling to first fill up '/dir1' followed by using '/dir2' and '/dir3' in a "
+    "round robin manner.");
 DEFINE_bool(allow_multiple_scratch_dirs_per_device, true,
     "If false and --scratch_dirs contains multiple directories on the same device, "
     "then only the first writable directory is used");
@@ -86,6 +94,7 @@ DEFINE_bool(allow_multiple_scratch_dirs_per_device, true,
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
 using boost::algorithm::split;
+using boost::algorithm::token_compress_off;
 using boost::algorithm::token_compress_on;
 using boost::filesystem::absolute;
 using boost::filesystem::path;
@@ -165,41 +174,57 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
               ExecEnv::GetInstance()->process_mem_tracker()));
     }
   }
-  vector<TmpDir> tmp_dirs;
+  vector<std::unique_ptr<TmpDir>> tmp_dirs;
   // Parse the directory specifiers. Don't return an error on parse errors, just log a
   // warning - we don't want to abort process startup because of misconfigured scratch,
   // since queries will generally still be runnable.
   for (const string& tmp_dir_spec : tmp_dir_specifiers) {
     vector<string> toks;
-    split(toks, tmp_dir_spec, is_any_of(":"), token_compress_on);
-    if (toks.size() > 2) {
+    split(toks, tmp_dir_spec, is_any_of(":"), token_compress_off);
+    if (toks.size() > 3) {
       LOG(ERROR) << "Could not parse temporary dir specifier, too many colons: '"
                  << tmp_dir_spec << "'";
       continue;
     }
     int64_t bytes_limit = numeric_limits<int64_t>::max();
-    if (toks.size() == 2) {
+    if (toks.size() >= 2) {
       bool is_percent;
       bytes_limit = ParseUtil::ParseMemSpec(toks[1], &is_percent, 0);
       if (bytes_limit < 0 || is_percent) {
-        LOG(ERROR) << "Malformed data cache capacity configuration '" << tmp_dir_spec
-                   << "'";
+        LOG(ERROR) << "Malformed scratch directory capacity configuration '"
+                   << tmp_dir_spec << "'";
         continue;
       } else if (bytes_limit == 0) {
         // Interpret -1, 0 or empty string as no limit.
         bytes_limit = numeric_limits<int64_t>::max();
       }
     }
+    int priority = numeric_limits<int>::max();
+    if (toks.size() == 3 && !toks[2].empty()) {
+      StringParser::ParseResult result;
+      priority = StringParser::StringToInt<int>(toks[2].data(), toks[2].size(), &result);
+      if (result != StringParser::PARSE_SUCCESS) {
+        LOG(ERROR) << "Malformed scratch directory priority configuration '"
+                   << tmp_dir_spec << "'";
+        continue;
+      }
+    }
     IntGauge* bytes_used_metric = metrics->AddGauge(
         SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_dirs.size()));
-    tmp_dirs.emplace_back(toks[0], bytes_limit, bytes_used_metric);
+    tmp_dirs.emplace_back(new TmpDir(toks[0], bytes_limit, priority, bytes_used_metric));
   }
 
+  // Sort the tmp directories by priority.
+  std::sort(tmp_dirs.begin(), tmp_dirs.end(),
+      [](const std::unique_ptr<TmpDir>& a, const std::unique_ptr<TmpDir>& b) {
+        return a->priority < b->priority;
+      });
+
   vector<bool> is_tmp_dir_on_disk(DiskInfo::num_disks(), false);
   // For each tmp directory, find the disk it is on,
   // so additional tmp directories on the same disk can be skipped.
   for (int i = 0; i < tmp_dirs.size(); ++i) {
-    path tmp_path(trim_right_copy_if(tmp_dirs[i].path, is_any_of("/")));
+    path tmp_path(trim_right_copy_if(tmp_dirs[i]->path, is_any_of("/")));
     tmp_path = absolute(tmp_path);
     path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME);
     // tmp_path must be a writable directory.
@@ -228,9 +253,9 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
         if (disk_id >= 0) is_tmp_dir_on_disk[disk_id] = true;
         LOG(INFO) << "Using scratch directory " << scratch_subdir_path.string() << " on "
                   << "disk " << disk_id
-                  << " limit: " << PrettyPrinter::PrintBytes(tmp_dirs[i].bytes_limit);
-        tmp_dirs_.emplace_back(scratch_subdir_path.string(), tmp_dirs[i].bytes_limit,
-            tmp_dirs[i].bytes_used_metric);
+                  << " limit: " << PrettyPrinter::PrintBytes(tmp_dirs[i]->bytes_limit);
+        tmp_dirs_.emplace_back(scratch_subdir_path.string(), tmp_dirs[i]->bytes_limit,
+            tmp_dirs[i]->priority, tmp_dirs[i]->bytes_used_metric);
       } else {
         LOG(WARNING) << "Could not remove and recreate directory "
                      << scratch_subdir_path.string() << ": cannot use it for scratch. "
@@ -396,6 +421,23 @@ TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
     free_ranges_(64) {
   DCHECK(tmp_file_mgr != nullptr);
   io_ctx_ = io_mgr_->RegisterContext();
+  // Populate the priority based index ranges.
+  const std::vector<TmpDir>& tmp_dirs = tmp_file_mgr_->tmp_dirs_;
+  if (tmp_dirs.size() > 0) {
+    int start_index = 0;
+    int priority = tmp_dirs[0].priority;
+    for (int i = 0; i < tmp_dirs.size() - 1; ++i) {
+      priority = tmp_dirs[i].priority;
+      const int next_priority = tmp_dirs[i+1].priority;
+      if (next_priority != priority) {
+        tmp_files_index_range_.emplace(priority, TmpFileIndexRange(start_index, i));
+        start_index = i + 1;
+        priority = next_priority;
+      }
+    }
+    tmp_files_index_range_.emplace(priority,
+      TmpFileIndexRange(start_index, tmp_dirs.size() - 1));
+  }
 }
 
 TmpFileGroup::~TmpFileGroup() {
@@ -416,9 +458,16 @@ Status TmpFileGroup::CreateFiles() {
     ++files_allocated;
   }
   DCHECK_EQ(tmp_files_.size(), files_allocated);
+  DCHECK_EQ(tmp_file_mgr_->tmp_dirs_.size(), tmp_files_.size());
   if (tmp_files_.size() == 0) return ScratchAllocationFailedStatus({});
-  // Start allocating on a random device to avoid overloading the first device.
-  next_allocation_index_ = rand() % tmp_files_.size();
+  // Initialize the next allocation index for each priority.
+  for (const auto& entry: tmp_files_index_range_) {
+    const int priority = entry.first;
+    const int start = entry.second.start;
+    const int end = entry.second.end;
+    // Start allocating on a random device to avoid overloading the first device.
+    next_allocation_index_.emplace(priority, start + rand() % (end - start + 1));
+  }
   return Status::OK();
 }
 
@@ -479,22 +528,31 @@ Status TmpFileGroup::AllocateSpace(
   // that some disks were at capacity.
   vector<int> at_capacity_dirs;
 
-  // Find the next physical file in round-robin order and allocate a range from it.
-  for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) {
-    int idx = next_allocation_index_;
-    next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size();
-    *tmp_file = tmp_files_[idx].get();
-    if ((*tmp_file)->is_blacklisted()) continue;
-
-    // Check the per-directory limit.
-    if (!(*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset)) {
-      at_capacity_dirs.push_back(idx);
-      continue;
+  // Find the next physical file in priority based round-robin order and allocate a range
+  // from it.
+  for (const auto& entry: tmp_files_index_range_) {
+    const int priority = entry.first;
+    const int start = entry.second.start;
+    const int end = entry.second.end;
+    DCHECK (0 <= start && start <= end && end < tmp_files_.size())
+      << "Invalid index range: [" << start << ", " << end << "] "
+      << "tmp_files_.size(): " << tmp_files_.size();
+    for (int index = start; index <= end; ++index) {
+      const int idx = next_allocation_index_[priority];
+      next_allocation_index_[priority] = start + (idx - start + 1) % (end - start + 1);
+      *tmp_file = tmp_files_[idx].get();
+      if ((*tmp_file)->is_blacklisted()) continue;
+
+      // Check the per-directory limit.
+      if (!(*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset)) {
+        at_capacity_dirs.push_back(idx);
+        continue;
+      }
+      scratch_space_bytes_used_counter_->Add(scratch_range_bytes);
+      tmp_file_mgr_->scratch_bytes_used_metric_->Increment(scratch_range_bytes);
+      current_bytes_allocated_ += scratch_range_bytes;
+      return Status::OK();
     }
-    scratch_space_bytes_used_counter_->Add(scratch_range_bytes);
-    tmp_file_mgr_->scratch_bytes_used_metric_->Increment(scratch_range_bytes);
-    current_bytes_allocated_ += scratch_range_bytes;
-    return Status::OK();
   }
   return ScratchAllocationFailedStatus(at_capacity_dirs);
 }
@@ -737,7 +795,12 @@ string TmpFileGroup::DebugString() {
   stringstream ss;
   ss << "TmpFileGroup " << this << " bytes limit " << bytes_limit_
      << " current bytes allocated " << current_bytes_allocated_
-     << " next allocation index " << next_allocation_index_ << " writes "
+     << " next allocation index [ ";
+  // Get priority based allocation index.
+  for (const auto& entry: next_allocation_index_) {
+    ss << " (priority: " << entry.first << ", index: " << entry.second << "), ";
+  }
+  ss << "] writes "
      << write_counter_->value() << " bytes written " << bytes_written_counter_->value()
      << " uncompressed bytes written " << uncompressed_bytes_written_counter_->value()
      << " reads " << read_counter_->value() << " bytes read "
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 3ba3e21..5f81497 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -18,7 +18,9 @@
 #pragma once
 
 #include <functional>
+#include <map>
 #include <memory>
+#include <unordered_map>
 #include <utility>
 
 #include <mutex>
@@ -98,15 +100,20 @@ class TmpFileMgr {
 
   /// A configured temporary directory that TmpFileMgr allocates files in.
   struct TmpDir {
-    TmpDir(const std::string& path, int64_t bytes_limit, IntGauge* bytes_used_metric)
-      : path(path), bytes_limit(bytes_limit), bytes_used_metric(bytes_used_metric) {}
+    TmpDir(const std::string& path, int64_t bytes_limit, int priority,
+      IntGauge* bytes_used_metric)
+      : path(path), bytes_limit(bytes_limit), priority(priority),
+        bytes_used_metric(bytes_used_metric) {}
 
     /// Path to the temporary directory.
     const std::string path;
 
     /// Limit on bytes that should be written to this path. Set to maximum value
     /// of int64_t if there is no limit.
-    int64_t const bytes_limit;
+    const int64_t bytes_limit;
+
+    /// Scratch directory priority.
+    const int priority;
 
     /// The current bytes of scratch used for this temporary directory.
     IntGauge* const bytes_used_metric;
@@ -377,16 +384,31 @@ class TmpFileGroup {
   /// Protects below members.
   SpinLock lock_;
 
-  /// List of files representing the TmpFileGroup.
+  /// List of files representing the TmpFileGroup. Files are ordered by the priority of
+  /// the related TmpDir.
   std::vector<std::unique_ptr<TmpFile>> tmp_files_;
 
+  /// Index Range in the 'tmp_files'. Used to keep track of index range
+  /// corresponding to a given priority.
+  struct TmpFileIndexRange {
+    TmpFileIndexRange(int start, int end)
+      : start(start), end(end) {}
+    // Start index of the range.
+    const int start;
+    // End index of the range.
+    const int end;
+  };
+  /// Map storing the index range in the 'tmp_files', corresponding to scratch dirs's
+  /// priority.
+  std::map<int, TmpFileIndexRange> tmp_files_index_range_;
+
   /// Total space allocated in this group's files.
   int64_t current_bytes_allocated_;
 
   /// Index into 'tmp_files' denoting the file to which the next temporary file range
-  /// should be allocated from. Used to implement round-robin allocation from temporary
-  /// files.
-  int next_allocation_index_;
+  /// should be allocated from, for a given priority. Used to implement round-robin
+  /// allocation from temporary files.
+  std::unordered_map<int, int> next_allocation_index_;
 
   /// Each vector in free_ranges_[i] is a vector of File/offset pairs for free scratch
   /// ranges of length 2^i bytes. Has 64 entries so that every int64_t length has a
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index a1daa5e..805489e 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -24,6 +24,7 @@ import stat
 import tempfile
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.verifiers.metric_verifier import MetricVerifier
 
 class TestScratchDir(CustomClusterTestSuite):
   @classmethod
@@ -188,3 +189,69 @@ class TestScratchDir(CustomClusterTestSuite):
     impalad = self.cluster.get_any_impalad()
     client = impalad.service.create_beeswax_client()
     self.execute_query_expect_success(client, self.spill_query, exec_option)
+
+  @pytest.mark.execute_serially
+  def test_scratch_dirs_default_priority(self, vector):
+    """ 5 empty directories are created in the /tmp directory and we verify that all
+        of those directories are used as scratch disk. By default, all directories
+        should have the same priority and so all should be used in a round robin
+        manner."""
+    normal_dirs = self.generate_dirs(5)
+    self._start_impala_cluster([
+      '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
+      '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], cluster_size=1,
+      expected_num_executors=1)
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+                                    expected_count=len(normal_dirs))
+    vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
+    impalad = self.cluster.impalads[0]
+    client = impalad.service.create_beeswax_client()
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    verifier = MetricVerifier(impalad.service)
+    verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
+    metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0')
+    metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1')
+    metrics2 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-2')
+    metrics3 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-3')
+    metrics4 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-4')
+    assert (metrics0 > 0 and metrics1 > 0 and metrics2 > 0 and metrics3 > 0 and
+      metrics4 > 0)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+    client.close_query(handle)
+    client.close()
+
+  @pytest.mark.execute_serially
+  def test_scratch_dirs_prioritized_spill(self, vector):
+    """ 5 empty directories are created in the /tmp directory and we verify that only
+        the directories with highest priority are used as scratch disk."""
+    normal_dirs = self.generate_dirs(5)
+    normal_dirs[0] = '{0}::{1}'.format(normal_dirs[0], 1)
+    normal_dirs[1] = '{0}::{1}'.format(normal_dirs[1], 0)
+    normal_dirs[2] = '{0}::{1}'.format(normal_dirs[2], 1)
+    normal_dirs[3] = '{0}::{1}'.format(normal_dirs[3], 0)
+    normal_dirs[4] = '{0}::{1}'.format(normal_dirs[4], 1)
+    self._start_impala_cluster([
+      '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
+      '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], cluster_size=1,
+      expected_num_executors=1)
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+                                    expected_count=len(normal_dirs))
+    vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
+    impalad = self.cluster.impalads[0]
+    client = impalad.service.create_beeswax_client()
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    verifier = MetricVerifier(impalad.service)
+    verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
+    metrics0 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-0')
+    metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-1')
+    metrics2 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-2')
+    metrics3 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-3')
+    metrics4 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used.dir-4')
+    # dir1 and dir3 have highest priority and will be used as scratch disk.
+    assert (metrics1 > 0 and metrics3 > 0 and metrics0 == 0 and metrics2 == 0 and
+      metrics4 == 0)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+    client.close_query(handle)
+    client.close()