You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/07 03:27:19 UTC

[impala] 02/02: IMPALA-8376: directory limits for scratch usage

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 411189a8d733a66c363c72f8c404123d68640a3e
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Fri Aug 2 16:37:02 2019 -0700

    IMPALA-8376: directory limits for scratch usage
    
    This extends the --scratch_dirs syntax to support specifying a max
    capacity per directory, similarly to the --data_cache confirmation.
    The capacity is delimited from the directory name with ":" and
    uses the usual syntax for specifying memory. The following are
    valid arguments:
    * --scratch_dirs=/dir1,/dir2 (no limits)
    * --scratch_dirs=/dir1,/dir2:25G (only a limit on /dir2)
    * --scratch_dirs=/dir1:5MB,/dir2 (only a limit on /dir)
    * --scratch_dirs=/dir1:-1,/dir2:0 (alternative ways of
      expressing no limit)
    
    The usage is tracked with a metric per directory. Allocations
    from that directory start to fail when the limit is exceeded.
    These metrics are exposed as
    tmp-file-mgr.scratch-space-bytes-used.dir-0,
    tmp-file-mgr.scratch-space-bytes-used.dir-1, etc.
    
    Also add support for parsing terabyte specifiers to a utility
    function that is used for parsing many configurations.
    
    Testing:
    Added a unit test to exercise TmpFileMgr.
    
    Manually ran a spilling query on an impalad with multiple scratch dirs
    configured with different limits. Confirmed via metrics that the
    capacities were enforced.
    
    Change-Id: I696146a65dbb97f1ba200ae472358ae2db6eb441
    Reviewed-on: http://gerrit.cloudera.org:8080/13986
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/tmp-file-mgr-internal.h |  15 ++-
 be/src/runtime/tmp-file-mgr-test.cc    | 203 +++++++++++++++++++++++++++++++++
 be/src/runtime/tmp-file-mgr.cc         | 128 +++++++++++++++++----
 be/src/runtime/tmp-file-mgr.h          |  33 +++++-
 be/src/service/query-options-test.cc   |   8 +-
 be/src/util/parse-util-test.cc         |  11 ++
 be/src/util/parse-util.cc              |   6 +
 common/thrift/generate_error_codes.py  |   3 +-
 common/thrift/metrics.json             |  10 ++
 docs/topics/impala_mem_limit.xml       |   6 +-
 10 files changed, 382 insertions(+), 41 deletions(-)

diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
index 5d11c4d..090a019 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -36,11 +36,13 @@ class TmpFileMgr::File {
  public:
   File(FileGroup* file_group, DeviceId device_id, const std::string& path);
 
-  /// Allocates 'num_bytes' bytes in this file for a new block of data.
-  /// The file size is increased by a call to truncate() if necessary.
-  /// Sets 'offset' to the file offset of the first byte in the allocated
+  /// Allocates 'num_bytes' bytes in this file for a new block of data if there is
+  /// free capacity in this temporary directory. If there is insufficient capacity,
+  /// return false. Otherwise, update state and return true.
+  /// This function does not actually perform any file operations.
+  /// On success, sets 'offset' to the file offset of the first byte in the allocated
   /// range on success.
-  void AllocateSpace(int64_t num_bytes, int64_t* offset);
+  bool AllocateSpace(int64_t num_bytes, int64_t* offset);
 
   /// Called when an IO error is encountered for this file. Logs the error and blacklists
   /// the file.
@@ -85,7 +87,10 @@ class TmpFileMgr::File {
 
   /// Set to true to indicate that we shouldn't allocate any more space in this file.
   bool blacklisted_;
+
+  /// Helper to get the TmpDir that this file is associated with.
+  TmpDir* GetDir();
 };
-}
+} // namespace impala
 
 #endif
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 5bed3b4..9b852a6 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -17,6 +17,7 @@
 
 #include <cstdio>
 #include <cstdlib>
+#include <limits>
 #include <numeric>
 
 #include <boost/filesystem.hpp>
@@ -52,6 +53,11 @@ namespace impala {
 
 using namespace io;
 
+static const int64_t KILOBYTE = 1024L;
+static const int64_t MEGABYTE = 1024L * KILOBYTE;
+static const int64_t GIGABYTE = 1024L * MEGABYTE;
+static const int64_t TERABYTE = 1024L * GIGABYTE;
+
 class TmpFileMgrTest : public ::testing::Test {
  public:
   virtual void SetUp() {
@@ -76,6 +82,18 @@ class TmpFileMgrTest : public ::testing::Test {
 
   DiskIoMgr* io_mgr() { return test_env_->exec_env()->disk_io_mgr(); }
 
+  /// Helper to create a TmpFileMgr and initialise it with InitCustom(). Adds the mgr to
+  /// 'obj_pool_' for automatic cleanup at the end of each test. Fails the test if
+  /// InitCustom() fails.
+  TmpFileMgr* CreateTmpFileMgr(const string& tmp_dirs_spec) {
+    // Allocate a new metrics group for each TmpFileMgr so they don't get confused by
+    // the pre-existing metrics (TmpFileMgr assumes it's a singleton in product code).
+    MetricGroup* metrics = obj_pool_.Add(new MetricGroup(""));
+    TmpFileMgr* mgr = obj_pool_.Add(new TmpFileMgr());
+    EXPECT_OK(mgr->InitCustom(tmp_dirs_spec, false, metrics));
+    return mgr;
+  }
+
   /// Check that metric values are consistent with TmpFileMgr state.
   void CheckMetrics(TmpFileMgr* tmp_file_mgr) {
     vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->ActiveTmpDevices();
@@ -122,6 +140,11 @@ class TmpFileMgrTest : public ::testing::Test {
     return Status::OK();
   }
 
+  /// Helper to get the private tmp_dirs_ member.
+  static const vector<TmpFileMgr::TmpDir>& GetTmpDirs(TmpFileMgr* mgr) {
+    return mgr->tmp_dirs_;
+  }
+
   /// Helper to call the private TmpFileMgr::NewFile() method.
   static void NewFile(TmpFileMgr* mgr, TmpFileMgr::FileGroup* group,
       TmpFileMgr::DeviceId device_id, unique_ptr<TmpFileMgr::File>* new_file) {
@@ -644,4 +667,184 @@ TEST_F(TmpFileMgrTest, TestHWMMetric) {
   file_group_2.Close();
   checkHWMMetrics(0, 2 * LIMIT);
 }
+
+// Test that usage per directory is tracked correctly and per-directory limits are
+// enforced. Sets up several scratch directories, some with limits, and checks
+// that the allocations occur in the right directories.
+TEST_F(TmpFileMgrTest, TestDirectoryLimits) {
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2",
+      "/tmp/tmp-file-mgr-test.3"});
+  vector<string> tmp_dir_specs({"/tmp/tmp-file-mgr-test.1:512",
+      "/tmp/tmp-file-mgr-test.2:1k", "/tmp/tmp-file-mgr-test.3"});
+  RemoveAndCreateDirs(tmp_dirs);
+  TmpFileMgr tmp_file_mgr;
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dir_specs, false, metrics_.get()));
+
+  TmpFileMgr::FileGroup file_group_1(
+      &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p1"), TUniqueId());
+  TmpFileMgr::FileGroup file_group_2(
+      &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p2"), TUniqueId());
+
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group_1, &files));
+  ASSERT_OK(CreateFiles(&file_group_2, &files));
+
+  IntGauge* dir1_usage = metrics_->FindMetricForTesting<IntGauge>(
+      "tmp-file-mgr.scratch-space-bytes-used.dir-0");
+  IntGauge* dir2_usage = metrics_->FindMetricForTesting<IntGauge>(
+      "tmp-file-mgr.scratch-space-bytes-used.dir-1");
+  IntGauge* dir3_usage = metrics_->FindMetricForTesting<IntGauge>(
+      "tmp-file-mgr.scratch-space-bytes-used.dir-2");
+
+  // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
+  const int64_t ALLOC_SIZE = 512;
+  int64_t offset;
+  TmpFileMgr::File* alloc_file;
+
+  // Allocate three times - once per directory. We expect these allocations to go through
+  // so we should have one allocation in each directory.
+  SetNextAllocationIndex(&file_group_1, 0);
+  for (int i = 0; i < tmp_dir_specs.size(); ++i) {
+    ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
+  }
+  EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
+  EXPECT_EQ(ALLOC_SIZE, dir2_usage->GetValue());
+  EXPECT_EQ(ALLOC_SIZE, dir3_usage->GetValue());
+
+  // This time we should hit the limit on the first directory. Do this from a
+  // different file group to show that limits are enforced across file groups.
+  for (int i = 0; i < 2; ++i) {
+    ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
+  }
+  EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
+  EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue());
+  EXPECT_EQ(2 * ALLOC_SIZE, dir3_usage->GetValue());
+
+  // Now we're at the limits on two directories, all allocations should got to the
+  // last directory without a limit.
+  for (int i = 0; i < 100; ++i) {
+    ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
+  }
+  EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
+  EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue());
+  EXPECT_EQ(102 * ALLOC_SIZE, dir3_usage->GetValue());
+
+  file_group_2.Close();
+  // Metrics should be decremented when the file groups delete the underlying files.
+  EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
+  EXPECT_EQ(ALLOC_SIZE, dir2_usage->GetValue());
+  EXPECT_EQ(ALLOC_SIZE, dir3_usage->GetValue());
+
+  // We should be able to reuse the space freed up.
+  ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
+
+  EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue());
+  file_group_1.Close();
+  EXPECT_EQ(0, dir1_usage->GetValue());
+  EXPECT_EQ(0, dir2_usage->GetValue());
+  EXPECT_EQ(0, dir3_usage->GetValue());
+}
+
+// Test the case when all per-directory limits are hit. We expect to return a status
+// and fail gracefully.
+TEST_F(TmpFileMgrTest, TestDirectoryLimitsExhausted) {
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
+  vector<string> tmp_dir_specs(
+      {"/tmp/tmp-file-mgr-test.1:256kb", "/tmp/tmp-file-mgr-test.2:1mb"});
+  const int64_t DIR1_LIMIT = 256L * 1024L;
+  const int64_t DIR2_LIMIT = 1024L * 1024L;
+  RemoveAndCreateDirs(tmp_dirs);
+  TmpFileMgr tmp_file_mgr;
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dir_specs, false, metrics_.get()));
+
+  TmpFileMgr::FileGroup file_group_1(
+      &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p1"), TUniqueId());
+  TmpFileMgr::FileGroup file_group_2(
+      &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p2"), TUniqueId());
+
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group_1, &files));
+  ASSERT_OK(CreateFiles(&file_group_2, &files));
+
+  IntGauge* dir1_usage = metrics_->FindMetricForTesting<IntGauge>(
+      "tmp-file-mgr.scratch-space-bytes-used.dir-0");
+  IntGauge* dir2_usage = metrics_->FindMetricForTesting<IntGauge>(
+      "tmp-file-mgr.scratch-space-bytes-used.dir-1");
+
+  // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
+  const int64_t ALLOC_SIZE = 512;
+  const int64_t MAX_ALLOCATIONS = (DIR1_LIMIT + DIR2_LIMIT) / ALLOC_SIZE;
+  int64_t offset;
+  TmpFileMgr::File* alloc_file;
+
+  // Allocate exactly the maximum total capacity of the directories.
+  SetNextAllocationIndex(&file_group_1, 0);
+  for (int i = 0; i < MAX_ALLOCATIONS; ++i) {
+    ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
+  }
+  EXPECT_EQ(DIR1_LIMIT, dir1_usage->GetValue());
+  EXPECT_EQ(DIR2_LIMIT, dir2_usage->GetValue());
+  // The directories are at capacity, so allocations should fail.
+  Status err1 = GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset);
+  ASSERT_EQ(err1.code(), TErrorCode::SCRATCH_ALLOCATION_FAILED);
+  Status err2 = GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset);
+  ASSERT_EQ(err2.code(), TErrorCode::SCRATCH_ALLOCATION_FAILED);
+
+  // A FileGroup should recover once allocations are released, i.e. it does not
+  // permanently block allocating files from the group.
+  file_group_1.Close();
+  ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
+  file_group_2.Close();
+}
+
+// Test the directory parsing logic, including the various error cases.
+TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
+  RemoveAndCreateDirs({"/tmp/tmp-file-mgr-test1", "/tmp/tmp-file-mgr-test2",
+      "/tmp/tmp-file-mgr-test3", "/tmp/tmp-file-mgr-test4", "/tmp/tmp-file-mgr-test5",
+      "/tmp/tmp-file-mgr-test6", "/tmp/tmp-file-mgr-test7"});
+  // Configure various directories with valid formats.
+  auto& dirs = GetTmpDirs(
+      CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:5g,/tmp/tmp-file-mgr-test2,"
+                       "/tmp/tmp-file-mgr-test3:1234,/tmp/tmp-file-mgr-test4:99999999,"
+                       "/tmp/tmp-file-mgr-test5:200tb,/tmp/tmp-file-mgr-test6:100MB"));
+  EXPECT_EQ(6, dirs.size());
+  EXPECT_EQ(5 * GIGABYTE, dirs[0].bytes_limit);
+  EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1].bytes_limit);
+  EXPECT_EQ(1234, dirs[2].bytes_limit);
+  EXPECT_EQ(99999999, dirs[3].bytes_limit);
+  EXPECT_EQ(200 * TERABYTE, dirs[4].bytes_limit);
+  EXPECT_EQ(100 * MEGABYTE, dirs[5].bytes_limit);
+
+  // Various invalid limit formats result in the directory getting skipped.
+  // Include a valid dir on the end to ensure that we don't short-circuit all
+  // directories.
+  auto& dirs2 = GetTmpDirs(
+      CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:foo,/tmp/tmp-file-mgr-test2:?,"
+                       "/tmp/tmp-file-mgr-test3:1.2.3.4,/tmp/tmp-file-mgr-test4: ,"
+                       "/tmp/tmp-file-mgr-test5:5pb,/tmp/tmp-file-mgr-test6:10%,"
+                       "/tmp/tmp-file-mgr-test1:100"));
+  EXPECT_EQ(1, dirs2.size());
+  EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0].path);
+  EXPECT_EQ(100, dirs2[0].bytes_limit);
+
+  // Various valid ways of specifying "unlimited".
+  auto& dirs3 =
+      GetTmpDirs(CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:,/tmp/tmp-file-mgr-test2:-1,"
+                                  "/tmp/tmp-file-mgr-test3,/tmp/tmp-file-mgr-test4:0"));
+  EXPECT_EQ(4, dirs3.size());
+  for (const auto& dir : dirs3) {
+    EXPECT_EQ(numeric_limits<int64_t>::max(), dir.bytes_limit);
+  }
+
+  // Extra colons
+  auto& dirs4 = GetTmpDirs(
+      CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:1:,/tmp/tmp-file-mgr-test2:10mb::"));
+  EXPECT_EQ(0, dirs4.size());
+
+  // Empty strings.
+  auto& nodirs = GetTmpDirs(CreateTmpFileMgr(""));
+  EXPECT_EQ(0, nodirs.size());
+  auto& empty_paths = GetTmpDirs(CreateTmpFileMgr(","));
+  EXPECT_EQ(2, empty_paths.size());
 }
+} // namespace impala
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 400d4d6..7954a62 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -17,6 +17,8 @@
 
 #include "runtime/tmp-file-mgr.h"
 
+#include <limits>
+
 #include <boost/algorithm/string.hpp>
 #include <boost/filesystem.hpp>
 #include <boost/lexical_cast.hpp>
@@ -35,6 +37,7 @@
 #include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/filesystem-util.h"
+#include "util/parse-util.h"
 #include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
 
@@ -43,7 +46,14 @@
 DEFINE_bool(disk_spill_encryption, true,
     "Set this to encrypt and perform an integrity "
     "check on all data spilled to disk during a query");
-DEFINE_string(scratch_dirs, "/tmp", "Writable scratch directories");
+DEFINE_string(scratch_dirs, "/tmp",
+    "Writable scratch directories. "
+    "This is a comma-separated list of directories. Each directory is "
+    "specified as the directory path and an optional limit on the bytes that will "
+    "be allocated in that directory. If the optional limit is provided, the path and "
+    "the limit are separated by a colon. E.g. '/dir1:10G,/dir2:5GB,/dir3' will allow "
+    "allocating up to 10GB of scratch in /dir1, 5GB of scratch in /dir2 and an "
+    "unlimited amount in /dir3.");
 DEFINE_bool(allow_multiple_scratch_dirs_per_device, true,
     "If false and --scratch_dirs contains multiple directories on the same device, "
     "then only the first writable directory is used");
@@ -71,6 +81,8 @@ const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK =
     "tmp-file-mgr.scratch-space-bytes-used-high-water-mark";
 const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED =
     "tmp-file-mgr.scratch-space-bytes-used";
+const string SCRATCH_DIR_BYTES_USED_FORMAT =
+    "tmp-file-mgr.scratch-space-bytes-used.dir-$0";
 
 TmpFileMgr::TmpFileMgr()
   : initialized_(false),
@@ -79,27 +91,61 @@ TmpFileMgr::TmpFileMgr()
     scratch_bytes_used_metric_(nullptr) {}
 
 Status TmpFileMgr::Init(MetricGroup* metrics) {
-  string tmp_dirs_spec = FLAGS_scratch_dirs;
+  return InitCustom(
+      FLAGS_scratch_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics);
+}
+
+Status TmpFileMgr::InitCustom(
+    const string& tmp_dirs_spec, bool one_dir_per_device, MetricGroup* metrics) {
   vector<string> all_tmp_dirs;
   // Empty string should be interpreted as no scratch
   if (!tmp_dirs_spec.empty()) {
     split(all_tmp_dirs, tmp_dirs_spec, is_any_of(","), token_compress_on);
   }
-  return InitCustom(all_tmp_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics);
+  return InitCustom(all_tmp_dirs, one_dir_per_device, metrics);
 }
 
-Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_device,
-      MetricGroup* metrics) {
+Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
+    bool one_dir_per_device, MetricGroup* metrics) {
   DCHECK(!initialized_);
-  if (tmp_dirs.empty()) {
+  if (tmp_dir_specifiers.empty()) {
     LOG(WARNING) << "Running without spill to disk: no scratch directories provided.";
   }
+  vector<TmpDir> tmp_dirs;
+  // Parse the directory specifiers. Don't return an error on parse errors, just log a
+  // warning - we don't want to abort process startup because of misconfigured scratch,
+  // since queries will generally still be runnable.
+  for (const string& tmp_dir_spec : tmp_dir_specifiers) {
+    vector<string> toks;
+    split(toks, tmp_dir_spec, is_any_of(":"), token_compress_on);
+    if (toks.size() > 2) {
+      LOG(ERROR) << "Could not parse temporary dir specifier, too many colons: '"
+                 << tmp_dir_spec << "'";
+      continue;
+    }
+    int64_t bytes_limit = numeric_limits<int64_t>::max();
+    if (toks.size() == 2) {
+      bool is_percent;
+      bytes_limit = ParseUtil::ParseMemSpec(toks[1], &is_percent, 0);
+      if (bytes_limit < 0 || is_percent) {
+        LOG(ERROR) << "Malformed data cache capacity configuration '" << tmp_dir_spec
+                   << "'";
+        continue;
+      } else if (bytes_limit == 0) {
+        // Interpret -1, 0 or empty string as no limit.
+        bytes_limit = numeric_limits<int64_t>::max();
+      }
+    }
+    IntGauge* bytes_used_metric = metrics->AddGauge(
+        SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_dirs.size()));
+    tmp_dirs.emplace_back(toks[0], bytes_limit, bytes_used_metric);
+  }
 
   vector<bool> is_tmp_dir_on_disk(DiskInfo::num_disks(), false);
   // For each tmp directory, find the disk it is on,
   // so additional tmp directories on the same disk can be skipped.
   for (int i = 0; i < tmp_dirs.size(); ++i) {
-    path tmp_path(trim_right_copy_if(tmp_dirs[i], is_any_of("/")));
+    path tmp_path(trim_right_copy_if(tmp_dirs[i].path, is_any_of("/")));
     tmp_path = absolute(tmp_path);
     path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME);
     // tmp_path must be a writable directory.
@@ -127,8 +173,10 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
       if (status.ok()) {
         if (disk_id >= 0) is_tmp_dir_on_disk[disk_id] = true;
         LOG(INFO) << "Using scratch directory " << scratch_subdir_path.string() << " on "
-                  << "disk " << disk_id;
-        tmp_dirs_.push_back(scratch_subdir_path.string());
+                  << "disk " << disk_id
+                  << " limit: " << PrettyPrinter::PrintBytes(tmp_dirs[i].bytes_limit);
+        tmp_dirs_.emplace_back(scratch_subdir_path.string(), tmp_dirs[i].bytes_limit,
+            tmp_dirs[i].bytes_used_metric);
       } else {
         LOG(WARNING) << "Could not remove and recreate directory "
                      << scratch_subdir_path.string() << ": cannot use it for scratch. "
@@ -144,7 +192,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
       metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
   num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size());
   for (int i = 0; i < tmp_dirs_.size(); ++i) {
-    active_scratch_dirs_metric_->Add(tmp_dirs_[i]);
+    active_scratch_dirs_metric_->Add(tmp_dirs_[i].path);
   }
   scratch_bytes_used_metric_ =
       metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK,
@@ -154,7 +202,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
 
   if (tmp_dirs_.empty() && !tmp_dirs.empty()) {
     LOG(ERROR) << "Running without spill to disk: could not use any scratch "
-               << "directories in list: " << join(tmp_dirs, ",")
+               << "directories in list: " << join(tmp_dir_specifiers, ",")
                << ". See previous warnings for information on causes.";
   }
   return Status::OK();
@@ -170,7 +218,7 @@ void TmpFileMgr::NewFile(
   string unique_name = lexical_cast<string>(random_generator()());
   stringstream file_name;
   file_name << PrintId(file_group->unique_id()) << "_" << unique_name;
-  path new_file_path(tmp_dirs_[device_id]);
+  path new_file_path(tmp_dirs_[device_id].path);
   new_file_path /= file_name.str();
 
   new_file->reset(new File(file_group, device_id, new_file_path.string()));
@@ -180,7 +228,7 @@ string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const {
   DCHECK(initialized_);
   DCHECK_GE(device_id, 0);
   DCHECK_LT(device_id, tmp_dirs_.size());
-  return tmp_dirs_[device_id];
+  return tmp_dirs_[device_id].path;
 }
 
 int TmpFileMgr::NumActiveTmpDevices() {
@@ -206,10 +254,17 @@ TmpFileMgr::File::File(FileGroup* file_group, DeviceId device_id, const string&
   DCHECK(file_group != nullptr);
 }
 
-void TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) {
+bool TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) {
   DCHECK_GT(num_bytes, 0);
+  TmpDir* dir = GetDir();
+  // Increment optimistically and roll back if the limit is exceeded.
+  if (dir->bytes_used_metric->Increment(num_bytes) > dir->bytes_limit) {
+    dir->bytes_used_metric->Increment(-num_bytes);
+    return false;
+  }
   *offset = bytes_allocated_;
   bytes_allocated_ += num_bytes;
+  return true;
 }
 
 int TmpFileMgr::File::AssignDiskQueue() const {
@@ -223,7 +278,13 @@ void TmpFileMgr::File::Blacklist(const ErrorMsg& msg) {
 
 Status TmpFileMgr::File::Remove() {
   // Remove the file if present (it may not be present if no writes completed).
-  return FileSystemUtil::RemovePaths({path_});
+  Status status = FileSystemUtil::RemovePaths({path_});
+  GetDir()->bytes_used_metric->Increment(-bytes_allocated_);
+  return status;
+}
+
+TmpFileMgr::TmpDir* TmpFileMgr::File::GetDir() {
+  return &file_group_->tmp_file_mgr_->tmp_dirs_[device_id_];
 }
 
 string TmpFileMgr::File::DebugString() {
@@ -272,7 +333,7 @@ Status TmpFileMgr::FileGroup::CreateFiles() {
     ++files_allocated;
   }
   DCHECK_EQ(tmp_files_.size(), files_allocated);
-  if (tmp_files_.size() == 0) return ScratchAllocationFailedStatus();
+  if (tmp_files_.size() == 0) return ScratchAllocationFailedStatus({});
   // Start allocating on a random device to avoid overloading the first device.
   next_allocation_index_ = rand() % tmp_files_.size();
   return Status::OK();
@@ -315,18 +376,27 @@ Status TmpFileMgr::FileGroup::AllocateSpace(
   // Lazily create the files on the first write.
   if (tmp_files_.empty()) RETURN_IF_ERROR(CreateFiles());
 
+  // Track the indices of any directories where we failed due to capacity. This is
+  // required for error reporting if we are totally out of capacity so that it's clear
+  // that some disks were at capacity.
+  vector<int> at_capacity_dirs;
+
   // Find the next physical file in round-robin order and allocate a range from it.
   for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) {
-    *tmp_file = tmp_files_[next_allocation_index_].get();
+    int idx = next_allocation_index_;
     next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size();
+    *tmp_file = tmp_files_[idx].get();
     if ((*tmp_file)->is_blacklisted()) continue;
-    (*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset);
+    if (!(*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset)) {
+      at_capacity_dirs.push_back(idx);
+      continue;
+    }
     scratch_space_bytes_used_counter_->Add(scratch_range_bytes);
     tmp_file_mgr_->scratch_bytes_used_metric_->Increment(scratch_range_bytes);
     current_bytes_allocated_ += num_bytes;
     return Status::OK();
   }
-  return ScratchAllocationFailedStatus();
+  return ScratchAllocationFailedStatus(at_capacity_dirs);
 }
 
 void TmpFileMgr::FileGroup::RecycleFileRange(unique_ptr<WriteHandle> handle) {
@@ -484,11 +554,21 @@ Status TmpFileMgr::FileGroup::RecoverWriteError(
   return handle->RetryWrite(io_ctx_.get(), tmp_file, file_offset);
 }
 
-Status TmpFileMgr::FileGroup::ScratchAllocationFailedStatus() {
-  Status status(TErrorCode::SCRATCH_ALLOCATION_FAILED,
-        join(tmp_file_mgr_->tmp_dirs_, ","), GetBackendString(),
-        PrettyPrinter::PrintBytes(scratch_space_bytes_used_counter_->value()),
-        PrettyPrinter::PrintBytes(current_bytes_allocated_));
+Status TmpFileMgr::FileGroup::ScratchAllocationFailedStatus(
+    const vector<int>& at_capacity_dirs) {
+  vector<string> tmp_dir_paths;
+  for (TmpDir& tmp_dir : tmp_file_mgr_->tmp_dirs_) {
+    tmp_dir_paths.push_back(tmp_dir.path);
+  }
+  vector<string> at_capacity_dir_paths;
+  for (int dir_idx : at_capacity_dirs) {
+    at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_[dir_idx].path);
+  }
+  Status status(TErrorCode::SCRATCH_ALLOCATION_FAILED, join(tmp_dir_paths, ","),
+      GetBackendString(),
+      PrettyPrinter::PrintBytes(scratch_space_bytes_used_counter_->value()),
+      PrettyPrinter::PrintBytes(current_bytes_allocated_),
+      join(at_capacity_dir_paths, ","));
   // Include all previous errors that may have caused the failure.
   for (Status& err : scratch_errors_) status.MergeStatus(err);
   return status;
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 9a38e4f..70ec1d4 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -92,6 +92,22 @@ class TmpFileMgr {
   /// Same typedef as io::WriteRange::WriteDoneCallback.
   typedef std::function<void(const Status&)> WriteDoneCallback;
 
+  /// A configured temporary directory that TmpFileMgr allocates files in.
+  struct TmpDir {
+    TmpDir(const std::string& path, int64_t bytes_limit, IntGauge* bytes_used_metric)
+      : path(path), bytes_limit(bytes_limit), bytes_used_metric(bytes_used_metric) {}
+
+    /// Path to the temporary directory.
+    const std::string path;
+
+    /// Limit on bytes that should be written to this path. Set to maximum value
+    /// of int64_t if there is no limit.
+    int64_t const bytes_limit;
+
+    /// The current bytes of scratch used for this temporary directory.
+    IntGauge* const bytes_used_metric;
+  };
+
   /// Represents a group of temporary files - one per disk with a scratch directory. The
   /// total allocated bytes of the group can be bound by setting the space allocation
   /// limit. The owner of the FileGroup object is responsible for calling the Close()
@@ -202,8 +218,11 @@ class TmpFileMgr {
 
     /// Return a SCRATCH_ALLOCATION_FAILED error with the appropriate information,
     /// including scratch directories, the amount of scratch allocated and previous
-    /// errors that caused this failure. 'lock_' must be held by caller.
-    Status ScratchAllocationFailedStatus();
+    /// errors that caused this failure. If some directories were at capacity,
+    /// but had not encountered an error, the indices of these directories in
+    /// tmp_file_mgr_->tmp_dir_ should be included in 'at_capacity_dirs'.
+    /// 'lock_' must be held by caller.
+    Status ScratchAllocationFailedStatus(const std::vector<int>& at_capacity_dirs);
 
     /// The TmpFileMgr it is associated with.
     TmpFileMgr* const tmp_file_mgr_;
@@ -390,9 +409,13 @@ class TmpFileMgr {
 
   /// Custom initialization - initializes with the provided list of directories.
   /// If one_dir_per_device is true, only use one temporary directory per device.
-  /// This interface is intended for testing purposes.
-  Status InitCustom(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device,
+  /// This interface is intended for testing purposes. 'tmp_dir_specifiers'
+  /// use the command-line syntax, i.e. <path>[:<limit>]. The first variant takes
+  /// a comma-separated list, the second takes a vector.
+  Status InitCustom(const std::string& tmp_dirs_spec, bool one_dir_per_device,
       MetricGroup* metrics) WARN_UNUSED_RESULT;
+  Status InitCustom(const std::vector<std::string>& tmp_dir_specifiers,
+      bool one_dir_per_device, MetricGroup* metrics) WARN_UNUSED_RESULT;
 
   /// Return the scratch directory path for the device.
   std::string GetTmpDirPath(DeviceId device_id) const;
@@ -419,7 +442,7 @@ class TmpFileMgr {
   bool initialized_;
 
   /// The paths of the created tmp directories.
-  std::vector<std::string> tmp_dirs_;
+  std::vector<TmpDir> tmp_dirs_;
 
   /// Metrics to track active scratch directories.
   IntGauge* num_active_scratch_dirs_metric_;
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 8c02417..79f50cc 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -82,7 +82,8 @@ auto MakeTestOkFn(TQueryOptions& options, OptionDef<T> option_def) {
 template<typename T>
 auto MakeTestErrFn(TQueryOptions& options, OptionDef<T> option_def) {
   return [&options, option_def](const char* str) {
-    EXPECT_FALSE(SetQueryOption(option_def.option_name, str, &options, nullptr).ok());
+    EXPECT_FALSE(SetQueryOption(option_def.option_name, str, &options, nullptr).ok())
+      << option_def.option_name << " " << str;
   };
 }
 
@@ -110,7 +111,7 @@ void TestByteCaseSet(TQueryOptions& options,
     }
     TestError(to_string(range.lower_bound - 1).c_str());
     TestError(to_string(static_cast<uint64_t>(range.upper_bound) + 1).c_str());
-    TestError("1tb");
+    TestError("1pb");
     TestError("1%");
     TestError("1%B");
     TestError("1B%");
@@ -120,6 +121,7 @@ void TestByteCaseSet(TQueryOptions& options,
         {"1 B", 1},
         {"0Kb", 0},
         {"4G",  4ll * 1024 * 1024 * 1024},
+        {"4tb",  4ll * 1024 * 1024 * 1024 * 1024},
         {"-1M", -1024 * 1024}
     };
     for (const auto& value_def : common_values) {
@@ -307,7 +309,7 @@ TEST(QueryOptions, SetSpecialOptions) {
     TestOk("0", 0);
     TestOk("4GB", 4ll * 1024 * 1024 * 1024);
     TestError("-1MB");
-    TestError("1tb");
+    TestError("1pb");
     TestError("1%");
     TestError("1%B");
     TestError("1B%");
diff --git a/be/src/util/parse-util-test.cc b/be/src/util/parse-util-test.cc
index 6b8e7a5..c8fb4a0 100644
--- a/be/src/util/parse-util-test.cc
+++ b/be/src/util/parse-util-test.cc
@@ -35,6 +35,7 @@ TEST(ParseMemSpecs, Basic) {
   int64_t kilobytes = 1024;
   int64_t megabytes = 1024 * kilobytes;
   int64_t gigabytes = 1024 * megabytes;
+  int64_t terabytes = 1024 * gigabytes;
 
   bytes = ParseUtil::ParseMemSpec("1", &is_percent, MemInfo::physical_mem());
   ASSERT_EQ(1, bytes);
@@ -72,6 +73,14 @@ TEST(ParseMemSpecs, Basic) {
   ASSERT_EQ(12 * gigabytes, bytes);
   ASSERT_FALSE(is_percent);
 
+  bytes = ParseUtil::ParseMemSpec("8T", &is_percent, MemInfo::physical_mem());
+  ASSERT_EQ(8 * terabytes, bytes);
+  ASSERT_FALSE(is_percent);
+
+  bytes = ParseUtil::ParseMemSpec("12tb", &is_percent, MemInfo::physical_mem());
+  ASSERT_EQ(12 * terabytes, bytes);
+  ASSERT_FALSE(is_percent);
+
   bytes = ParseUtil::ParseMemSpec("13%", &is_percent, MemInfo::physical_mem());
   ASSERT_GT(bytes, 0);
   ASSERT_TRUE(is_percent);
@@ -91,6 +100,8 @@ TEST(ParseMemSpecs, Basic) {
   bad_values.push_back("1Bb");
   bad_values.push_back("1%%");
   bad_values.push_back("1.1");
+  bad_values.push_back("1pb");
+  bad_values.push_back("1eb");
   stringstream ss;
   ss << UINT64_MAX;
   bad_values.push_back(ss.str());
diff --git a/be/src/util/parse-util.cc b/be/src/util/parse-util.cc
index d17cf3e..75253d7 100644
--- a/be/src/util/parse-util.cc
+++ b/be/src/util/parse-util.cc
@@ -40,6 +40,12 @@ int64_t ParseUtil::ParseMemSpec(const string& mem_spec_str, bool* is_percent,
     number_str_len--;
   }
   switch (*suffix_char) {
+    case 't':
+    case 'T':
+      // Terabytes.
+      number_str_len--;
+      multiplier = 1024L * 1024L * 1024L * 1024L;
+      break;
     case 'g':
     case 'G':
       // Gigabytes.
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 5e1070d..bb45c3d 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -308,7 +308,8 @@ error_codes = (
   ("SCRATCH_ALLOCATION_FAILED", 101, "Could not create files in any configured scratch "
    "directories (--scratch_dirs=$0) on backend '$1'. $2 of scratch is currently in "
    "use by this Impala Daemon ($3 by this query). See logs for previous errors that "
-   "may have prevented creating or writing scratch files."),
+   "may have prevented creating or writing scratch files. The following directories "
+   "were at capacity: $4"),
 
   ("SCRATCH_READ_TRUNCATED", 102, "Error reading $0 bytes from scratch file '$1' "
    "on backend $2 at offset $3: could only read $4 bytes"),
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index bd30674..1ce9264 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2151,6 +2151,16 @@
     "key": "tmp-file-mgr.scratch-space-bytes-used-high-water-mark"
   },
   {
+    "description": "The current total spilled bytes for a single scratch directory.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Per-directory scratch space bytes used",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "tmp-file-mgr.scratch-space-bytes-used.dir-$0"
+  },
+  {
     "description": "Number of senders waiting for receiving fragment to initialize",
     "contexts": [
       "IMPALAD"
diff --git a/docs/topics/impala_mem_limit.xml b/docs/topics/impala_mem_limit.xml
index 2bd89e5..d61edf3 100644
--- a/docs/topics/impala_mem_limit.xml
+++ b/docs/topics/impala_mem_limit.xml
@@ -167,10 +167,10 @@ MEM_LIMIT set to 3mb
     </p>
 
 <codeblock rev="">
-[localhost:21000] > set mem_limit=3tb;
-MEM_LIMIT set to 3tb
+[localhost:21000] > set mem_limit=3pb;
+MEM_LIMIT set to 3pb
 [localhost:21000] > select 5;
-ERROR: Failed to parse query memory limit from '3tb'.
+ERROR: Failed to parse query memory limit from '3pb'.
 
 [localhost:21000] > set mem_limit=xyz;
 MEM_LIMIT set to xyz