You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/07 03:27:19 UTC
[impala] 02/02: IMPALA-8376: directory limits for scratch usage
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 411189a8d733a66c363c72f8c404123d68640a3e
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Fri Aug 2 16:37:02 2019 -0700
IMPALA-8376: directory limits for scratch usage
This extends the --scratch_dirs syntax to support specifying a max
capacity per directory, similarly to the --data_cache confirmation.
The capacity is delimited from the directory name with ":" and
uses the usual syntax for specifying memory. The following are
valid arguments:
* --scratch_dirs=/dir1,/dir2 (no limits)
* --scratch_dirs=/dir1,/dir2:25G (only a limit on /dir2)
* --scratch_dirs=/dir1:5MB,/dir2 (only a limit on /dir)
* --scratch_dirs=/dir1:-1,/dir2:0 (alternative ways of
expressing no limit)
The usage is tracked with a metric per directory. Allocations
from that directory start to fail when the limit is exceeded.
These metrics are exposed as
tmp-file-mgr.scratch-space-bytes-used.dir-0,
tmp-file-mgr.scratch-space-bytes-used.dir-1, etc.
Also add support for parsing terabyte specifiers to a utility
function that is used for parsing many configurations.
Testing:
Added a unit test to exercise TmpFileMgr.
Manually ran a spilling query on an impalad with multiple scratch dirs
configured with different limits. Confirmed via metrics that the
capacities were enforced.
Change-Id: I696146a65dbb97f1ba200ae472358ae2db6eb441
Reviewed-on: http://gerrit.cloudera.org:8080/13986
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/runtime/tmp-file-mgr-internal.h | 15 ++-
be/src/runtime/tmp-file-mgr-test.cc | 203 +++++++++++++++++++++++++++++++++
be/src/runtime/tmp-file-mgr.cc | 128 +++++++++++++++++----
be/src/runtime/tmp-file-mgr.h | 33 +++++-
be/src/service/query-options-test.cc | 8 +-
be/src/util/parse-util-test.cc | 11 ++
be/src/util/parse-util.cc | 6 +
common/thrift/generate_error_codes.py | 3 +-
common/thrift/metrics.json | 10 ++
docs/topics/impala_mem_limit.xml | 6 +-
10 files changed, 382 insertions(+), 41 deletions(-)
diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
index 5d11c4d..090a019 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -36,11 +36,13 @@ class TmpFileMgr::File {
public:
File(FileGroup* file_group, DeviceId device_id, const std::string& path);
- /// Allocates 'num_bytes' bytes in this file for a new block of data.
- /// The file size is increased by a call to truncate() if necessary.
- /// Sets 'offset' to the file offset of the first byte in the allocated
+ /// Allocates 'num_bytes' bytes in this file for a new block of data if there is
+ /// free capacity in this temporary directory. If there is insufficient capacity,
+ /// return false. Otherwise, update state and return true.
+ /// This function does not actually perform any file operations.
+ /// On success, sets 'offset' to the file offset of the first byte in the allocated
/// range on success.
- void AllocateSpace(int64_t num_bytes, int64_t* offset);
+ bool AllocateSpace(int64_t num_bytes, int64_t* offset);
/// Called when an IO error is encountered for this file. Logs the error and blacklists
/// the file.
@@ -85,7 +87,10 @@ class TmpFileMgr::File {
/// Set to true to indicate that we shouldn't allocate any more space in this file.
bool blacklisted_;
+
+ /// Helper to get the TmpDir that this file is associated with.
+ TmpDir* GetDir();
};
-}
+} // namespace impala
#endif
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 5bed3b4..9b852a6 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -17,6 +17,7 @@
#include <cstdio>
#include <cstdlib>
+#include <limits>
#include <numeric>
#include <boost/filesystem.hpp>
@@ -52,6 +53,11 @@ namespace impala {
using namespace io;
+static const int64_t KILOBYTE = 1024L;
+static const int64_t MEGABYTE = 1024L * KILOBYTE;
+static const int64_t GIGABYTE = 1024L * MEGABYTE;
+static const int64_t TERABYTE = 1024L * GIGABYTE;
+
class TmpFileMgrTest : public ::testing::Test {
public:
virtual void SetUp() {
@@ -76,6 +82,18 @@ class TmpFileMgrTest : public ::testing::Test {
DiskIoMgr* io_mgr() { return test_env_->exec_env()->disk_io_mgr(); }
+ /// Helper to create a TmpFileMgr and initialise it with InitCustom(). Adds the mgr to
+ /// 'obj_pool_' for automatic cleanup at the end of each test. Fails the test if
+ /// InitCustom() fails.
+ TmpFileMgr* CreateTmpFileMgr(const string& tmp_dirs_spec) {
+ // Allocate a new metrics group for each TmpFileMgr so they don't get confused by
+ // the pre-existing metrics (TmpFileMgr assumes it's a singleton in product code).
+ MetricGroup* metrics = obj_pool_.Add(new MetricGroup(""));
+ TmpFileMgr* mgr = obj_pool_.Add(new TmpFileMgr());
+ EXPECT_OK(mgr->InitCustom(tmp_dirs_spec, false, metrics));
+ return mgr;
+ }
+
/// Check that metric values are consistent with TmpFileMgr state.
void CheckMetrics(TmpFileMgr* tmp_file_mgr) {
vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->ActiveTmpDevices();
@@ -122,6 +140,11 @@ class TmpFileMgrTest : public ::testing::Test {
return Status::OK();
}
+ /// Helper to get the private tmp_dirs_ member.
+ static const vector<TmpFileMgr::TmpDir>& GetTmpDirs(TmpFileMgr* mgr) {
+ return mgr->tmp_dirs_;
+ }
+
/// Helper to call the private TmpFileMgr::NewFile() method.
static void NewFile(TmpFileMgr* mgr, TmpFileMgr::FileGroup* group,
TmpFileMgr::DeviceId device_id, unique_ptr<TmpFileMgr::File>* new_file) {
@@ -644,4 +667,184 @@ TEST_F(TmpFileMgrTest, TestHWMMetric) {
file_group_2.Close();
checkHWMMetrics(0, 2 * LIMIT);
}
+
+// Test that usage per directory is tracked correctly and per-directory limits are
+// enforced. Sets up several scratch directories, some with limits, and checks
+// that the allocations occur in the right directories.
+TEST_F(TmpFileMgrTest, TestDirectoryLimits) {
+ vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2",
+ "/tmp/tmp-file-mgr-test.3"});
+ vector<string> tmp_dir_specs({"/tmp/tmp-file-mgr-test.1:512",
+ "/tmp/tmp-file-mgr-test.2:1k", "/tmp/tmp-file-mgr-test.3"});
+ RemoveAndCreateDirs(tmp_dirs);
+ TmpFileMgr tmp_file_mgr;
+ ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dir_specs, false, metrics_.get()));
+
+ TmpFileMgr::FileGroup file_group_1(
+ &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p1"), TUniqueId());
+ TmpFileMgr::FileGroup file_group_2(
+ &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p2"), TUniqueId());
+
+ vector<TmpFileMgr::File*> files;
+ ASSERT_OK(CreateFiles(&file_group_1, &files));
+ ASSERT_OK(CreateFiles(&file_group_2, &files));
+
+ IntGauge* dir1_usage = metrics_->FindMetricForTesting<IntGauge>(
+ "tmp-file-mgr.scratch-space-bytes-used.dir-0");
+ IntGauge* dir2_usage = metrics_->FindMetricForTesting<IntGauge>(
+ "tmp-file-mgr.scratch-space-bytes-used.dir-1");
+ IntGauge* dir3_usage = metrics_->FindMetricForTesting<IntGauge>(
+ "tmp-file-mgr.scratch-space-bytes-used.dir-2");
+
+ // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
+ const int64_t ALLOC_SIZE = 512;
+ int64_t offset;
+ TmpFileMgr::File* alloc_file;
+
+ // Allocate three times - once per directory. We expect these allocations to go through
+ // so we should have one allocation in each directory.
+ SetNextAllocationIndex(&file_group_1, 0);
+ for (int i = 0; i < tmp_dir_specs.size(); ++i) {
+ ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
+ }
+ EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
+ EXPECT_EQ(ALLOC_SIZE, dir2_usage->GetValue());
+ EXPECT_EQ(ALLOC_SIZE, dir3_usage->GetValue());
+
+ // This time we should hit the limit on the first directory. Do this from a
+ // different file group to show that limits are enforced across file groups.
+ for (int i = 0; i < 2; ++i) {
+ ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
+ }
+ EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
+ EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue());
+ EXPECT_EQ(2 * ALLOC_SIZE, dir3_usage->GetValue());
+
+ // Now we're at the limits on two directories, all allocations should got to the
+ // last directory without a limit.
+ for (int i = 0; i < 100; ++i) {
+ ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
+ }
+ EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
+ EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue());
+ EXPECT_EQ(102 * ALLOC_SIZE, dir3_usage->GetValue());
+
+ file_group_2.Close();
+ // Metrics should be decremented when the file groups delete the underlying files.
+ EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
+ EXPECT_EQ(ALLOC_SIZE, dir2_usage->GetValue());
+ EXPECT_EQ(ALLOC_SIZE, dir3_usage->GetValue());
+
+ // We should be able to reuse the space freed up.
+ ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
+
+ EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue());
+ file_group_1.Close();
+ EXPECT_EQ(0, dir1_usage->GetValue());
+ EXPECT_EQ(0, dir2_usage->GetValue());
+ EXPECT_EQ(0, dir3_usage->GetValue());
+}
+
+// Test the case when all per-directory limits are hit. We expect to return a status
+// and fail gracefully.
+TEST_F(TmpFileMgrTest, TestDirectoryLimitsExhausted) {
+ vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
+ vector<string> tmp_dir_specs(
+ {"/tmp/tmp-file-mgr-test.1:256kb", "/tmp/tmp-file-mgr-test.2:1mb"});
+ const int64_t DIR1_LIMIT = 256L * 1024L;
+ const int64_t DIR2_LIMIT = 1024L * 1024L;
+ RemoveAndCreateDirs(tmp_dirs);
+ TmpFileMgr tmp_file_mgr;
+ ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dir_specs, false, metrics_.get()));
+
+ TmpFileMgr::FileGroup file_group_1(
+ &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p1"), TUniqueId());
+ TmpFileMgr::FileGroup file_group_2(
+ &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p2"), TUniqueId());
+
+ vector<TmpFileMgr::File*> files;
+ ASSERT_OK(CreateFiles(&file_group_1, &files));
+ ASSERT_OK(CreateFiles(&file_group_2, &files));
+
+ IntGauge* dir1_usage = metrics_->FindMetricForTesting<IntGauge>(
+ "tmp-file-mgr.scratch-space-bytes-used.dir-0");
+ IntGauge* dir2_usage = metrics_->FindMetricForTesting<IntGauge>(
+ "tmp-file-mgr.scratch-space-bytes-used.dir-1");
+
+ // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
+ const int64_t ALLOC_SIZE = 512;
+ const int64_t MAX_ALLOCATIONS = (DIR1_LIMIT + DIR2_LIMIT) / ALLOC_SIZE;
+ int64_t offset;
+ TmpFileMgr::File* alloc_file;
+
+ // Allocate exactly the maximum total capacity of the directories.
+ SetNextAllocationIndex(&file_group_1, 0);
+ for (int i = 0; i < MAX_ALLOCATIONS; ++i) {
+ ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
+ }
+ EXPECT_EQ(DIR1_LIMIT, dir1_usage->GetValue());
+ EXPECT_EQ(DIR2_LIMIT, dir2_usage->GetValue());
+ // The directories are at capacity, so allocations should fail.
+ Status err1 = GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset);
+ ASSERT_EQ(err1.code(), TErrorCode::SCRATCH_ALLOCATION_FAILED);
+ Status err2 = GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset);
+ ASSERT_EQ(err2.code(), TErrorCode::SCRATCH_ALLOCATION_FAILED);
+
+ // A FileGroup should recover once allocations are released, i.e. it does not
+ // permanently block allocating files from the group.
+ file_group_1.Close();
+ ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
+ file_group_2.Close();
+}
+
+// Test the directory parsing logic, including the various error cases.
+TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
+ RemoveAndCreateDirs({"/tmp/tmp-file-mgr-test1", "/tmp/tmp-file-mgr-test2",
+ "/tmp/tmp-file-mgr-test3", "/tmp/tmp-file-mgr-test4", "/tmp/tmp-file-mgr-test5",
+ "/tmp/tmp-file-mgr-test6", "/tmp/tmp-file-mgr-test7"});
+ // Configure various directories with valid formats.
+ auto& dirs = GetTmpDirs(
+ CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:5g,/tmp/tmp-file-mgr-test2,"
+ "/tmp/tmp-file-mgr-test3:1234,/tmp/tmp-file-mgr-test4:99999999,"
+ "/tmp/tmp-file-mgr-test5:200tb,/tmp/tmp-file-mgr-test6:100MB"));
+ EXPECT_EQ(6, dirs.size());
+ EXPECT_EQ(5 * GIGABYTE, dirs[0].bytes_limit);
+ EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1].bytes_limit);
+ EXPECT_EQ(1234, dirs[2].bytes_limit);
+ EXPECT_EQ(99999999, dirs[3].bytes_limit);
+ EXPECT_EQ(200 * TERABYTE, dirs[4].bytes_limit);
+ EXPECT_EQ(100 * MEGABYTE, dirs[5].bytes_limit);
+
+ // Various invalid limit formats result in the directory getting skipped.
+ // Include a valid dir on the end to ensure that we don't short-circuit all
+ // directories.
+ auto& dirs2 = GetTmpDirs(
+ CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:foo,/tmp/tmp-file-mgr-test2:?,"
+ "/tmp/tmp-file-mgr-test3:1.2.3.4,/tmp/tmp-file-mgr-test4: ,"
+ "/tmp/tmp-file-mgr-test5:5pb,/tmp/tmp-file-mgr-test6:10%,"
+ "/tmp/tmp-file-mgr-test1:100"));
+ EXPECT_EQ(1, dirs2.size());
+ EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0].path);
+ EXPECT_EQ(100, dirs2[0].bytes_limit);
+
+ // Various valid ways of specifying "unlimited".
+ auto& dirs3 =
+ GetTmpDirs(CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:,/tmp/tmp-file-mgr-test2:-1,"
+ "/tmp/tmp-file-mgr-test3,/tmp/tmp-file-mgr-test4:0"));
+ EXPECT_EQ(4, dirs3.size());
+ for (const auto& dir : dirs3) {
+ EXPECT_EQ(numeric_limits<int64_t>::max(), dir.bytes_limit);
+ }
+
+ // Extra colons
+ auto& dirs4 = GetTmpDirs(
+ CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:1:,/tmp/tmp-file-mgr-test2:10mb::"));
+ EXPECT_EQ(0, dirs4.size());
+
+ // Empty strings.
+ auto& nodirs = GetTmpDirs(CreateTmpFileMgr(""));
+ EXPECT_EQ(0, nodirs.size());
+ auto& empty_paths = GetTmpDirs(CreateTmpFileMgr(","));
+ EXPECT_EQ(2, empty_paths.size());
}
+} // namespace impala
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 400d4d6..7954a62 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -17,6 +17,8 @@
#include "runtime/tmp-file-mgr.h"
+#include <limits>
+
#include <boost/algorithm/string.hpp>
#include <boost/filesystem.hpp>
#include <boost/lexical_cast.hpp>
@@ -35,6 +37,7 @@
#include "util/debug-util.h"
#include "util/disk-info.h"
#include "util/filesystem-util.h"
+#include "util/parse-util.h"
#include "util/pretty-printer.h"
#include "util/runtime-profile-counters.h"
@@ -43,7 +46,14 @@
DEFINE_bool(disk_spill_encryption, true,
"Set this to encrypt and perform an integrity "
"check on all data spilled to disk during a query");
-DEFINE_string(scratch_dirs, "/tmp", "Writable scratch directories");
+DEFINE_string(scratch_dirs, "/tmp",
+ "Writable scratch directories. "
+ "This is a comma-separated list of directories. Each directory is "
+ "specified as the directory path and an optional limit on the bytes that will "
+ "be allocated in that directory. If the optional limit is provided, the path and "
+ "the limit are separated by a colon. E.g. '/dir1:10G,/dir2:5GB,/dir3' will allow "
+ "allocating up to 10GB of scratch in /dir1, 5GB of scratch in /dir2 and an "
+ "unlimited amount in /dir3.");
DEFINE_bool(allow_multiple_scratch_dirs_per_device, true,
"If false and --scratch_dirs contains multiple directories on the same device, "
"then only the first writable directory is used");
@@ -71,6 +81,8 @@ const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK =
"tmp-file-mgr.scratch-space-bytes-used-high-water-mark";
const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED =
"tmp-file-mgr.scratch-space-bytes-used";
+const string SCRATCH_DIR_BYTES_USED_FORMAT =
+ "tmp-file-mgr.scratch-space-bytes-used.dir-$0";
TmpFileMgr::TmpFileMgr()
: initialized_(false),
@@ -79,27 +91,61 @@ TmpFileMgr::TmpFileMgr()
scratch_bytes_used_metric_(nullptr) {}
Status TmpFileMgr::Init(MetricGroup* metrics) {
- string tmp_dirs_spec = FLAGS_scratch_dirs;
+ return InitCustom(
+ FLAGS_scratch_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics);
+}
+
+Status TmpFileMgr::InitCustom(
+ const string& tmp_dirs_spec, bool one_dir_per_device, MetricGroup* metrics) {
vector<string> all_tmp_dirs;
// Empty string should be interpreted as no scratch
if (!tmp_dirs_spec.empty()) {
split(all_tmp_dirs, tmp_dirs_spec, is_any_of(","), token_compress_on);
}
- return InitCustom(all_tmp_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics);
+ return InitCustom(all_tmp_dirs, one_dir_per_device, metrics);
}
-Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_device,
- MetricGroup* metrics) {
+Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
+ bool one_dir_per_device, MetricGroup* metrics) {
DCHECK(!initialized_);
- if (tmp_dirs.empty()) {
+ if (tmp_dir_specifiers.empty()) {
LOG(WARNING) << "Running without spill to disk: no scratch directories provided.";
}
+ vector<TmpDir> tmp_dirs;
+ // Parse the directory specifiers. Don't return an error on parse errors, just log a
+ // warning - we don't want to abort process startup because of misconfigured scratch,
+ // since queries will generally still be runnable.
+ for (const string& tmp_dir_spec : tmp_dir_specifiers) {
+ vector<string> toks;
+ split(toks, tmp_dir_spec, is_any_of(":"), token_compress_on);
+ if (toks.size() > 2) {
+ LOG(ERROR) << "Could not parse temporary dir specifier, too many colons: '"
+ << tmp_dir_spec << "'";
+ continue;
+ }
+ int64_t bytes_limit = numeric_limits<int64_t>::max();
+ if (toks.size() == 2) {
+ bool is_percent;
+ bytes_limit = ParseUtil::ParseMemSpec(toks[1], &is_percent, 0);
+ if (bytes_limit < 0 || is_percent) {
+ LOG(ERROR) << "Malformed data cache capacity configuration '" << tmp_dir_spec
+ << "'";
+ continue;
+ } else if (bytes_limit == 0) {
+ // Interpret -1, 0 or empty string as no limit.
+ bytes_limit = numeric_limits<int64_t>::max();
+ }
+ }
+ IntGauge* bytes_used_metric = metrics->AddGauge(
+ SCRATCH_DIR_BYTES_USED_FORMAT, 0, Substitute("$0", tmp_dirs.size()));
+ tmp_dirs.emplace_back(toks[0], bytes_limit, bytes_used_metric);
+ }
vector<bool> is_tmp_dir_on_disk(DiskInfo::num_disks(), false);
// For each tmp directory, find the disk it is on,
// so additional tmp directories on the same disk can be skipped.
for (int i = 0; i < tmp_dirs.size(); ++i) {
- path tmp_path(trim_right_copy_if(tmp_dirs[i], is_any_of("/")));
+ path tmp_path(trim_right_copy_if(tmp_dirs[i].path, is_any_of("/")));
tmp_path = absolute(tmp_path);
path scratch_subdir_path(tmp_path / TMP_SUB_DIR_NAME);
// tmp_path must be a writable directory.
@@ -127,8 +173,10 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
if (status.ok()) {
if (disk_id >= 0) is_tmp_dir_on_disk[disk_id] = true;
LOG(INFO) << "Using scratch directory " << scratch_subdir_path.string() << " on "
- << "disk " << disk_id;
- tmp_dirs_.push_back(scratch_subdir_path.string());
+ << "disk " << disk_id
+ << " limit: " << PrettyPrinter::PrintBytes(tmp_dirs[i].bytes_limit);
+ tmp_dirs_.emplace_back(scratch_subdir_path.string(), tmp_dirs[i].bytes_limit,
+ tmp_dirs[i].bytes_used_metric);
} else {
LOG(WARNING) << "Could not remove and recreate directory "
<< scratch_subdir_path.string() << ": cannot use it for scratch. "
@@ -144,7 +192,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
num_active_scratch_dirs_metric_->SetValue(tmp_dirs_.size());
for (int i = 0; i < tmp_dirs_.size(); ++i) {
- active_scratch_dirs_metric_->Add(tmp_dirs_[i]);
+ active_scratch_dirs_metric_->Add(tmp_dirs_[i].path);
}
scratch_bytes_used_metric_ =
metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK,
@@ -154,7 +202,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
if (tmp_dirs_.empty() && !tmp_dirs.empty()) {
LOG(ERROR) << "Running without spill to disk: could not use any scratch "
- << "directories in list: " << join(tmp_dirs, ",")
+ << "directories in list: " << join(tmp_dir_specifiers, ",")
<< ". See previous warnings for information on causes.";
}
return Status::OK();
@@ -170,7 +218,7 @@ void TmpFileMgr::NewFile(
string unique_name = lexical_cast<string>(random_generator()());
stringstream file_name;
file_name << PrintId(file_group->unique_id()) << "_" << unique_name;
- path new_file_path(tmp_dirs_[device_id]);
+ path new_file_path(tmp_dirs_[device_id].path);
new_file_path /= file_name.str();
new_file->reset(new File(file_group, device_id, new_file_path.string()));
@@ -180,7 +228,7 @@ string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const {
DCHECK(initialized_);
DCHECK_GE(device_id, 0);
DCHECK_LT(device_id, tmp_dirs_.size());
- return tmp_dirs_[device_id];
+ return tmp_dirs_[device_id].path;
}
int TmpFileMgr::NumActiveTmpDevices() {
@@ -206,10 +254,17 @@ TmpFileMgr::File::File(FileGroup* file_group, DeviceId device_id, const string&
DCHECK(file_group != nullptr);
}
-void TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) {
+bool TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) {
DCHECK_GT(num_bytes, 0);
+ TmpDir* dir = GetDir();
+ // Increment optimistically and roll back if the limit is exceeded.
+ if (dir->bytes_used_metric->Increment(num_bytes) > dir->bytes_limit) {
+ dir->bytes_used_metric->Increment(-num_bytes);
+ return false;
+ }
*offset = bytes_allocated_;
bytes_allocated_ += num_bytes;
+ return true;
}
int TmpFileMgr::File::AssignDiskQueue() const {
@@ -223,7 +278,13 @@ void TmpFileMgr::File::Blacklist(const ErrorMsg& msg) {
Status TmpFileMgr::File::Remove() {
// Remove the file if present (it may not be present if no writes completed).
- return FileSystemUtil::RemovePaths({path_});
+ Status status = FileSystemUtil::RemovePaths({path_});
+ GetDir()->bytes_used_metric->Increment(-bytes_allocated_);
+ return status;
+}
+
+TmpFileMgr::TmpDir* TmpFileMgr::File::GetDir() {
+ return &file_group_->tmp_file_mgr_->tmp_dirs_[device_id_];
}
string TmpFileMgr::File::DebugString() {
@@ -272,7 +333,7 @@ Status TmpFileMgr::FileGroup::CreateFiles() {
++files_allocated;
}
DCHECK_EQ(tmp_files_.size(), files_allocated);
- if (tmp_files_.size() == 0) return ScratchAllocationFailedStatus();
+ if (tmp_files_.size() == 0) return ScratchAllocationFailedStatus({});
// Start allocating on a random device to avoid overloading the first device.
next_allocation_index_ = rand() % tmp_files_.size();
return Status::OK();
@@ -315,18 +376,27 @@ Status TmpFileMgr::FileGroup::AllocateSpace(
// Lazily create the files on the first write.
if (tmp_files_.empty()) RETURN_IF_ERROR(CreateFiles());
+ // Track the indices of any directories where we failed due to capacity. This is
+ // required for error reporting if we are totally out of capacity so that it's clear
+ // that some disks were at capacity.
+ vector<int> at_capacity_dirs;
+
// Find the next physical file in round-robin order and allocate a range from it.
for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) {
- *tmp_file = tmp_files_[next_allocation_index_].get();
+ int idx = next_allocation_index_;
next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size();
+ *tmp_file = tmp_files_[idx].get();
if ((*tmp_file)->is_blacklisted()) continue;
- (*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset);
+ if (!(*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset)) {
+ at_capacity_dirs.push_back(idx);
+ continue;
+ }
scratch_space_bytes_used_counter_->Add(scratch_range_bytes);
tmp_file_mgr_->scratch_bytes_used_metric_->Increment(scratch_range_bytes);
current_bytes_allocated_ += num_bytes;
return Status::OK();
}
- return ScratchAllocationFailedStatus();
+ return ScratchAllocationFailedStatus(at_capacity_dirs);
}
void TmpFileMgr::FileGroup::RecycleFileRange(unique_ptr<WriteHandle> handle) {
@@ -484,11 +554,21 @@ Status TmpFileMgr::FileGroup::RecoverWriteError(
return handle->RetryWrite(io_ctx_.get(), tmp_file, file_offset);
}
-Status TmpFileMgr::FileGroup::ScratchAllocationFailedStatus() {
- Status status(TErrorCode::SCRATCH_ALLOCATION_FAILED,
- join(tmp_file_mgr_->tmp_dirs_, ","), GetBackendString(),
- PrettyPrinter::PrintBytes(scratch_space_bytes_used_counter_->value()),
- PrettyPrinter::PrintBytes(current_bytes_allocated_));
+Status TmpFileMgr::FileGroup::ScratchAllocationFailedStatus(
+ const vector<int>& at_capacity_dirs) {
+ vector<string> tmp_dir_paths;
+ for (TmpDir& tmp_dir : tmp_file_mgr_->tmp_dirs_) {
+ tmp_dir_paths.push_back(tmp_dir.path);
+ }
+ vector<string> at_capacity_dir_paths;
+ for (int dir_idx : at_capacity_dirs) {
+ at_capacity_dir_paths.push_back(tmp_file_mgr_->tmp_dirs_[dir_idx].path);
+ }
+ Status status(TErrorCode::SCRATCH_ALLOCATION_FAILED, join(tmp_dir_paths, ","),
+ GetBackendString(),
+ PrettyPrinter::PrintBytes(scratch_space_bytes_used_counter_->value()),
+ PrettyPrinter::PrintBytes(current_bytes_allocated_),
+ join(at_capacity_dir_paths, ","));
// Include all previous errors that may have caused the failure.
for (Status& err : scratch_errors_) status.MergeStatus(err);
return status;
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 9a38e4f..70ec1d4 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -92,6 +92,22 @@ class TmpFileMgr {
/// Same typedef as io::WriteRange::WriteDoneCallback.
typedef std::function<void(const Status&)> WriteDoneCallback;
+ /// A configured temporary directory that TmpFileMgr allocates files in.
+ struct TmpDir {
+ TmpDir(const std::string& path, int64_t bytes_limit, IntGauge* bytes_used_metric)
+ : path(path), bytes_limit(bytes_limit), bytes_used_metric(bytes_used_metric) {}
+
+ /// Path to the temporary directory.
+ const std::string path;
+
+ /// Limit on bytes that should be written to this path. Set to maximum value
+ /// of int64_t if there is no limit.
+ int64_t const bytes_limit;
+
+ /// The current bytes of scratch used for this temporary directory.
+ IntGauge* const bytes_used_metric;
+ };
+
/// Represents a group of temporary files - one per disk with a scratch directory. The
/// total allocated bytes of the group can be bound by setting the space allocation
/// limit. The owner of the FileGroup object is responsible for calling the Close()
@@ -202,8 +218,11 @@ class TmpFileMgr {
/// Return a SCRATCH_ALLOCATION_FAILED error with the appropriate information,
/// including scratch directories, the amount of scratch allocated and previous
- /// errors that caused this failure. 'lock_' must be held by caller.
- Status ScratchAllocationFailedStatus();
+ /// errors that caused this failure. If some directories were at capacity,
+ /// but had not encountered an error, the indices of these directories in
+ /// tmp_file_mgr_->tmp_dir_ should be included in 'at_capacity_dirs'.
+ /// 'lock_' must be held by caller.
+ Status ScratchAllocationFailedStatus(const std::vector<int>& at_capacity_dirs);
/// The TmpFileMgr it is associated with.
TmpFileMgr* const tmp_file_mgr_;
@@ -390,9 +409,13 @@ class TmpFileMgr {
/// Custom initialization - initializes with the provided list of directories.
/// If one_dir_per_device is true, only use one temporary directory per device.
- /// This interface is intended for testing purposes.
- Status InitCustom(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device,
+ /// This interface is intended for testing purposes. 'tmp_dir_specifiers'
+ /// use the command-line syntax, i.e. <path>[:<limit>]. The first variant takes
+ /// a comma-separated list, the second takes a vector.
+ Status InitCustom(const std::string& tmp_dirs_spec, bool one_dir_per_device,
MetricGroup* metrics) WARN_UNUSED_RESULT;
+ Status InitCustom(const std::vector<std::string>& tmp_dir_specifiers,
+ bool one_dir_per_device, MetricGroup* metrics) WARN_UNUSED_RESULT;
/// Return the scratch directory path for the device.
std::string GetTmpDirPath(DeviceId device_id) const;
@@ -419,7 +442,7 @@ class TmpFileMgr {
bool initialized_;
/// The paths of the created tmp directories.
- std::vector<std::string> tmp_dirs_;
+ std::vector<TmpDir> tmp_dirs_;
/// Metrics to track active scratch directories.
IntGauge* num_active_scratch_dirs_metric_;
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 8c02417..79f50cc 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -82,7 +82,8 @@ auto MakeTestOkFn(TQueryOptions& options, OptionDef<T> option_def) {
template<typename T>
auto MakeTestErrFn(TQueryOptions& options, OptionDef<T> option_def) {
return [&options, option_def](const char* str) {
- EXPECT_FALSE(SetQueryOption(option_def.option_name, str, &options, nullptr).ok());
+ EXPECT_FALSE(SetQueryOption(option_def.option_name, str, &options, nullptr).ok())
+ << option_def.option_name << " " << str;
};
}
@@ -110,7 +111,7 @@ void TestByteCaseSet(TQueryOptions& options,
}
TestError(to_string(range.lower_bound - 1).c_str());
TestError(to_string(static_cast<uint64_t>(range.upper_bound) + 1).c_str());
- TestError("1tb");
+ TestError("1pb");
TestError("1%");
TestError("1%B");
TestError("1B%");
@@ -120,6 +121,7 @@ void TestByteCaseSet(TQueryOptions& options,
{"1 B", 1},
{"0Kb", 0},
{"4G", 4ll * 1024 * 1024 * 1024},
+ {"4tb", 4ll * 1024 * 1024 * 1024 * 1024},
{"-1M", -1024 * 1024}
};
for (const auto& value_def : common_values) {
@@ -307,7 +309,7 @@ TEST(QueryOptions, SetSpecialOptions) {
TestOk("0", 0);
TestOk("4GB", 4ll * 1024 * 1024 * 1024);
TestError("-1MB");
- TestError("1tb");
+ TestError("1pb");
TestError("1%");
TestError("1%B");
TestError("1B%");
diff --git a/be/src/util/parse-util-test.cc b/be/src/util/parse-util-test.cc
index 6b8e7a5..c8fb4a0 100644
--- a/be/src/util/parse-util-test.cc
+++ b/be/src/util/parse-util-test.cc
@@ -35,6 +35,7 @@ TEST(ParseMemSpecs, Basic) {
int64_t kilobytes = 1024;
int64_t megabytes = 1024 * kilobytes;
int64_t gigabytes = 1024 * megabytes;
+ int64_t terabytes = 1024 * gigabytes;
bytes = ParseUtil::ParseMemSpec("1", &is_percent, MemInfo::physical_mem());
ASSERT_EQ(1, bytes);
@@ -72,6 +73,14 @@ TEST(ParseMemSpecs, Basic) {
ASSERT_EQ(12 * gigabytes, bytes);
ASSERT_FALSE(is_percent);
+ bytes = ParseUtil::ParseMemSpec("8T", &is_percent, MemInfo::physical_mem());
+ ASSERT_EQ(8 * terabytes, bytes);
+ ASSERT_FALSE(is_percent);
+
+ bytes = ParseUtil::ParseMemSpec("12tb", &is_percent, MemInfo::physical_mem());
+ ASSERT_EQ(12 * terabytes, bytes);
+ ASSERT_FALSE(is_percent);
+
bytes = ParseUtil::ParseMemSpec("13%", &is_percent, MemInfo::physical_mem());
ASSERT_GT(bytes, 0);
ASSERT_TRUE(is_percent);
@@ -91,6 +100,8 @@ TEST(ParseMemSpecs, Basic) {
bad_values.push_back("1Bb");
bad_values.push_back("1%%");
bad_values.push_back("1.1");
+ bad_values.push_back("1pb");
+ bad_values.push_back("1eb");
stringstream ss;
ss << UINT64_MAX;
bad_values.push_back(ss.str());
diff --git a/be/src/util/parse-util.cc b/be/src/util/parse-util.cc
index d17cf3e..75253d7 100644
--- a/be/src/util/parse-util.cc
+++ b/be/src/util/parse-util.cc
@@ -40,6 +40,12 @@ int64_t ParseUtil::ParseMemSpec(const string& mem_spec_str, bool* is_percent,
number_str_len--;
}
switch (*suffix_char) {
+ case 't':
+ case 'T':
+ // Terabytes.
+ number_str_len--;
+ multiplier = 1024L * 1024L * 1024L * 1024L;
+ break;
case 'g':
case 'G':
// Gigabytes.
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 5e1070d..bb45c3d 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -308,7 +308,8 @@ error_codes = (
("SCRATCH_ALLOCATION_FAILED", 101, "Could not create files in any configured scratch "
"directories (--scratch_dirs=$0) on backend '$1'. $2 of scratch is currently in "
"use by this Impala Daemon ($3 by this query). See logs for previous errors that "
- "may have prevented creating or writing scratch files."),
+ "may have prevented creating or writing scratch files. The following directories "
+ "were at capacity: $4"),
("SCRATCH_READ_TRUNCATED", 102, "Error reading $0 bytes from scratch file '$1' "
"on backend $2 at offset $3: could only read $4 bytes"),
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index bd30674..1ce9264 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2151,6 +2151,16 @@
"key": "tmp-file-mgr.scratch-space-bytes-used-high-water-mark"
},
{
+ "description": "The current total spilled bytes for a single scratch directory.",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Per-directory scratch space bytes used",
+ "units": "BYTES",
+ "kind": "GAUGE",
+ "key": "tmp-file-mgr.scratch-space-bytes-used.dir-$0"
+ },
+ {
"description": "Number of senders waiting for receiving fragment to initialize",
"contexts": [
"IMPALAD"
diff --git a/docs/topics/impala_mem_limit.xml b/docs/topics/impala_mem_limit.xml
index 2bd89e5..d61edf3 100644
--- a/docs/topics/impala_mem_limit.xml
+++ b/docs/topics/impala_mem_limit.xml
@@ -167,10 +167,10 @@ MEM_LIMIT set to 3mb
</p>
<codeblock rev="">
-[localhost:21000] > set mem_limit=3tb;
-MEM_LIMIT set to 3tb
+[localhost:21000] > set mem_limit=3pb;
+MEM_LIMIT set to 3pb
[localhost:21000] > select 5;
-ERROR: Failed to parse query memory limit from '3tb'.
+ERROR: Failed to parse query memory limit from '3pb'.
[localhost:21000] > set mem_limit=xyz;
MEM_LIMIT set to xyz