You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/08/27 22:46:09 UTC
[kudu] branch master updated: KUDU-2907: add a dir to dir groups
when full
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new ae80d16 KUDU-2907: add a dir to dir groups when full
ae80d16 is described below
commit ae80d16037a5df14b4d35a76655564dae7dffe24
Author: Andrew Wong <aw...@apache.org>
AuthorDate: Wed Aug 21 14:36:19 2019 -0700
KUDU-2907: add a dir to dir groups when full
This patch extends the directory-picking behavior when selecting
directories for new blocks if all directories in the given tablet's
group are full. It now allows the expansion of a tablet's data dir group
if healthy directories are available.
This doesn't change the policy for picking directories for groups; it
only extends the usage of the policy to potentially add directories when
getting a directory for a new block.
Change-Id: Ic8e05bdfe1fc2ac0e7152d493f289d3ac6e850d7
Reviewed-on: http://gerrit.cloudera.org:8080/14122
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/fs/data_dirs-test.cc | 105 ++++++++++++++++++++--
src/kudu/fs/data_dirs.cc | 157 ++++++++++++++++++++++-----------
src/kudu/fs/data_dirs.h | 27 ++++--
src/kudu/fs/file_block_manager.cc | 2 +-
src/kudu/fs/log_block_manager.cc | 2 +-
src/kudu/tserver/tablet_server-test.cc | 68 ++++++++++++++
src/kudu/util/env_util.cc | 49 ++++++++--
7 files changed, 336 insertions(+), 74 deletions(-)
diff --git a/src/kudu/fs/data_dirs-test.cc b/src/kudu/fs/data_dirs-test.cc
index 078a884..9fa9027 100644
--- a/src/kudu/fs/data_dirs-test.cc
+++ b/src/kudu/fs/data_dirs-test.cc
@@ -21,7 +21,9 @@
#include <ostream>
#include <set>
#include <string>
+#include <thread>
#include <unordered_map>
+#include <unordered_set>
#include <utility>
#include <vector>
@@ -37,6 +39,7 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/barrier.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/metrics.h"
@@ -47,17 +50,21 @@
using std::set;
using std::string;
+using std::thread;
using std::vector;
using std::unique_ptr;
+using std::unordered_set;
using strings::Substitute;
DECLARE_bool(crash_on_eio);
DECLARE_double(env_inject_eio);
+DECLARE_double(env_inject_full);
DECLARE_int32(fs_data_dirs_available_space_cache_seconds);
DECLARE_int32(fs_target_data_dirs_per_tablet);
DECLARE_int64(disk_reserved_bytes_free_for_testing);
DECLARE_int64(fs_data_dirs_reserved_bytes);
DECLARE_string(env_inject_eio_globs);
+DECLARE_string(env_inject_full_globs);
METRIC_DECLARE_gauge_uint64(data_dirs_failed);
@@ -108,7 +115,7 @@ TEST_F(DataDirsTest, TestCreateGroup) {
// Test that the DataDirManager doesn't know about the tablets we're about
// to insert.
DataDir* dd = nullptr;
- Status s = dd_manager_->GetNextDataDir(test_block_opts_, &dd);
+ Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd);
ASSERT_EQ(nullptr, dd);
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Tried to get directory but no directory group "
@@ -145,7 +152,7 @@ TEST_F(DataDirsTest, TestCreateGroup) {
ASSERT_EQ(FLAGS_fs_target_data_dirs_per_tablet, num_dirs_with_tablets);
// Try to use the group.
- ASSERT_OK(dd_manager_->GetNextDataDir(test_block_opts_, &dd));
+ ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd));
ASSERT_FALSE(dd->is_full());
}
@@ -177,26 +184,108 @@ TEST_F(DataDirsTest, TestLoadFromPB) {
TEST_F(DataDirsTest, TestDeleteDataDirGroup) {
ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
DataDir* dd;
- ASSERT_OK(dd_manager_->GetNextDataDir(test_block_opts_, &dd));
+ ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd));
ASSERT_FALSE(dd->is_full());
dd_manager_->DeleteDataDirGroup(test_tablet_name_);
- Status s = dd_manager_->GetNextDataDir(test_block_opts_, &dd);
+ Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd);
ASSERT_FALSE(dd->is_full());
ASSERT_TRUE(s.IsNotFound()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Tried to get directory but no directory group "
"registered for tablet");
}
+// Inject full disk errors a couple different ways and make sure that we can't
+// create directory groups.
TEST_F(DataDirsTest, TestFullDisk) {
+ FLAGS_env_inject_full = 1.0;
+ Status s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
+ ASSERT_TRUE(s.IsIOError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "All healthy data directories are full");
+ FLAGS_env_inject_full = 0;
+
FLAGS_fs_data_dirs_available_space_cache_seconds = 0; // Don't cache device available space.
FLAGS_fs_data_dirs_reserved_bytes = 1; // Reserved space.
FLAGS_disk_reserved_bytes_free_for_testing = 0; // Free space.
- Status s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
+ s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
ASSERT_TRUE(s.IsIOError()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "All healthy data directories are full");
}
+// Test that when an entire directory group is full, new directories are added
+// to the group.
+TEST_F(DataDirsTest, TestFullDiskGrowsGroup) {
+ // First, create a directory group.
+ ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
+
+ // Mark every directory in the directory group full.
+ vector<string> data_dir_group;
+ ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &data_dir_group));
+ FLAGS_fs_data_dirs_available_space_cache_seconds = 0;
+ FLAGS_env_inject_full_globs = JoinStrings(data_dir_group, ",");
+ FLAGS_env_inject_full = 1.0;
+
+ // Try getting a new directory, adding if necessary.
+ DataDir* new_dir;
+ ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &new_dir));
+ unordered_set<string> old_dirs(data_dir_group.begin(), data_dir_group.end());
+ ASSERT_FALSE(ContainsKey(old_dirs, new_dir->dir()));
+
+ // The selected directory should have been added to the data directory group.
+ vector<string> new_dir_group;
+ ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &new_dir_group));
+ unordered_set<string> new_dirs(new_dir_group.begin(), new_dir_group.end());
+ ASSERT_TRUE(ContainsKey(new_dirs, new_dir->dir()));
+
+ // If all directories are full, we shouldn't be able to get a new directory.
+ FLAGS_env_inject_full_globs = "*";
+ Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &new_dir);
+ ASSERT_STR_CONTAINS(s.ToString(), "No directories available");
+ ASSERT_TRUE(s.IsIOError());
+}
+
+// Test that concurrently adding dirs to a data dir group yields the expected
+// number of dirs added.
+TEST_F(DataDirsTest, TestGrowGroupInParallel) {
+ // Set up a data dir group with no space so we can add to it.
+ ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
+ vector<string> data_dir_group;
+ ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &data_dir_group));
+ FLAGS_fs_data_dirs_available_space_cache_seconds = 0;
+ FLAGS_env_inject_full_globs = JoinStrings(data_dir_group, ",");
+ FLAGS_env_inject_full = 1.0;
+
+ // In parallel, try adding a directory to the group.
+ const int kNumThreads = 32;
+ vector<thread> threads;
+ vector<Status> statuses(kNumThreads);
+ vector<DataDir*> dds(kNumThreads);
+ Barrier b(kNumThreads);
+ for (int i = 0; i < kNumThreads; i++) {
+ threads.emplace_back([&, i] {
+ b.Wait();
+ statuses[i] = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dds[i]);
+ });
+ }
+ for (auto& t : threads) {
+ t.join();
+ }
+ for (const auto& s : statuses) {
+ EXPECT_OK(s);
+ }
+ // Check that we added a directory.
+ unordered_set<string> old_dirs(data_dir_group.begin(), data_dir_group.end());
+ ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &data_dir_group));
+ ASSERT_EQ(FLAGS_fs_target_data_dirs_per_tablet + 1, data_dir_group.size());
+ DataDir* new_dir = dds[0];
+ ASSERT_FALSE(ContainsKey(old_dirs, new_dir->dir()));
+
+ // All returned data directories should have been the newly added one.
+ for (int i = 1; i < kNumThreads; i++) {
+ ASSERT_EQ(new_dir, dds[i]);
+ }
+}
+
TEST_F(DataDirsTest, TestFailedDirNotReturned) {
FLAGS_fs_target_data_dirs_per_tablet = 2;
ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
@@ -204,7 +293,7 @@ TEST_F(DataDirsTest, TestFailedDirNotReturned) {
DataDir* failed_dd;
int uuid_idx;
// Fail one of the directories in the group and verify that it is not used.
- ASSERT_OK(dd_manager_->GetNextDataDir(test_block_opts_, &failed_dd));
+ ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &failed_dd));
ASSERT_TRUE(dd_manager_->FindUuidIndexByDataDir(failed_dd, &uuid_idx));
// These calls are idempotent.
ASSERT_OK(dd_manager_->MarkDataDirFailed(uuid_idx));
@@ -213,7 +302,7 @@ TEST_F(DataDirsTest, TestFailedDirNotReturned) {
ASSERT_EQ(1, down_cast<AtomicGauge<uint64_t>*>(
entity_->FindOrNull(METRIC_data_dirs_failed).get())->value());
for (int i = 0; i < 10; i++) {
- ASSERT_OK(dd_manager_->GetNextDataDir(test_block_opts_, &dd));
+ ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd));
ASSERT_NE(dd, failed_dd);
}
@@ -222,7 +311,7 @@ TEST_F(DataDirsTest, TestFailedDirNotReturned) {
ASSERT_OK(dd_manager_->MarkDataDirFailed(uuid_idx));
ASSERT_EQ(2, down_cast<AtomicGauge<uint64_t>*>(
entity_->FindOrNull(METRIC_data_dirs_failed).get())->value());
- Status s = dd_manager_->GetNextDataDir(test_block_opts_, &failed_dd);
+ Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &failed_dd);
ASSERT_TRUE(s.IsIOError());
ASSERT_STR_CONTAINS(s.ToString(), "No healthy directories exist in tablet's directory group");
}
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index f365e38..7263c09 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -24,7 +24,6 @@
#include <iterator>
#include <memory>
#include <mutex>
-#include <numeric>
#include <ostream>
#include <random>
#include <string>
@@ -116,7 +115,6 @@ namespace fs {
using internal::DataDirGroup;
using std::default_random_engine;
-using std::iota;
using std::pair;
using std::set;
using std::shuffle;
@@ -948,12 +946,12 @@ Status DataDirManager::CreateDataDirGroup(const string& tablet_id,
return Status::OK();
}
-Status DataDirManager::GetNextDataDir(const CreateBlockOptions& opts, DataDir** dir) {
+Status DataDirManager::GetDirForBlock(const CreateBlockOptions& opts, DataDir** dir,
+ int* new_target_group_size) const {
shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
- const vector<int>* group_uuid_indices;
- vector<int> valid_uuid_indices;
- DataDirGroup* group = nullptr;
-
+ vector<int> healthy_uuid_indices;
+ const DataDirGroup* group = nullptr;
+ DataDirGroup group_for_tests;
if (PREDICT_TRUE(!opts.tablet_id.empty())) {
// Get the data dir group for the tablet.
group = FindOrNull(group_by_tablet_map_, opts.tablet_id);
@@ -961,29 +959,29 @@ Status DataDirManager::GetNextDataDir(const CreateBlockOptions& opts, DataDir**
return Status::NotFound("Tried to get directory but no directory group "
"registered for tablet", opts.tablet_id);
}
- if (PREDICT_TRUE(failed_data_dirs_.empty())) {
- group_uuid_indices = &group->uuid_indices();
- } else {
- RemoveUnhealthyDataDirsUnlocked(group->uuid_indices(), &valid_uuid_indices);
- group_uuid_indices = &valid_uuid_indices;
- if (valid_uuid_indices.empty()) {
- return Status::IOError("No healthy directories exist in tablet's "
- "directory group", opts.tablet_id, ENODEV);
- }
- }
} else {
// This should only be reached by some tests; in cases where there is no
// natural tablet_id, select a data dir from any of the directories.
CHECK(IsGTest());
- AppendKeysFromMap(data_dir_by_uuid_idx_, &valid_uuid_indices);
- group_uuid_indices = &valid_uuid_indices;
+ vector<int> all_uuid_indices;
+ AppendKeysFromMap(data_dir_by_uuid_idx_, &all_uuid_indices);
+ group_for_tests = DataDirGroup(std::move(all_uuid_indices));
+ group = &group_for_tests;
}
-
- // In a certain group of dirs, randomly choose two not full dirs and select the one
- // with more free space.
+ // Within a given directory group, filter out the ones that are failed.
+ if (PREDICT_TRUE(failed_data_dirs_.empty())) {
+ healthy_uuid_indices = group->uuid_indices();
+ } else {
+ RemoveUnhealthyDataDirsUnlocked(group->uuid_indices(), &healthy_uuid_indices);
+ if (healthy_uuid_indices.empty()) {
+ return Status::IOError("No healthy directories exist in tablet's "
+ "directory group", opts.tablet_id, ENODEV);
+ }
+ }
+ // Within a given directory group, filter out the ones that are full.
vector<DataDir*> candidate_dirs;
- for (auto uuid_idx : *group_uuid_indices) {
- DataDir *candidate = FindOrDie(data_dir_by_uuid_idx_, uuid_idx);
+ for (auto uuid_idx : healthy_uuid_indices) {
+ DataDir* candidate = FindOrDie(data_dir_by_uuid_idx_, uuid_idx);
Status s = candidate->RefreshAvailableSpace(DataDir::RefreshMode::EXPIRED_ONLY);
WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", candidate->dir()));
if (s.ok() && !candidate->is_full()) {
@@ -991,35 +989,33 @@ Status DataDirManager::GetNextDataDir(const CreateBlockOptions& opts, DataDir**
}
}
- if (candidate_dirs.size() == 1) {
- *dir = candidate_dirs[0];
- return Status::OK();
- }
-
- if (candidate_dirs.size() >= 2) {
- shuffle(candidate_dirs.begin(), candidate_dirs.end(), default_random_engine(rng_.Next()));
- *dir = (candidate_dirs[0]->available_bytes() > candidate_dirs[1]->available_bytes()) ?
- candidate_dirs[0] : candidate_dirs[1];
- return Status::OK();
- }
-
- // All healthy directories are full. Return an error.
- if (PREDICT_TRUE(!opts.tablet_id.empty())) {
+ // If all the directories in the group are full, return an ENOSPC error.
+ if (PREDICT_FALSE(candidate_dirs.empty())) {
DCHECK(group);
size_t num_total = group->uuid_indices().size();
- size_t num_full = valid_uuid_indices.size();
+ size_t num_full = 0;
+ for (const auto& idx : group->uuid_indices()) {
+ if (FindOrDie(data_dir_by_uuid_idx_, idx)->is_full()) {
+ num_full++;
+ }
+ }
size_t num_failed = num_total - num_full;
+ *new_target_group_size = num_total + 1;
return Status::IOError(
Substitute("No directories available in $0's directory group ($1 dirs "
"total, $2 failed, $3 full)",
opts.tablet_id, num_total, num_failed, num_full),
"", ENOSPC);
}
-
- return Status::IOError(
- Substitute("No directories available ($0 dirs, all of which are full)",
- valid_uuid_indices.size()),
- "", ENOSPC);
+ if (candidate_dirs.size() == 1) {
+ *dir = candidate_dirs[0];
+ return Status::OK();
+ }
+ // Pick two randomly and select the one with more space.
+ shuffle(candidate_dirs.begin(), candidate_dirs.end(), default_random_engine(rng_.Next()));
+ *dir = (candidate_dirs[0]->available_bytes() > candidate_dirs[1]->available_bytes()) ?
+ candidate_dirs[0] : candidate_dirs[1];
+ return Status::OK();
}
void DataDirManager::DeleteDataDirGroup(const std::string& tablet_id) {
@@ -1047,21 +1043,83 @@ Status DataDirManager::GetDataDirGroupPB(const string& tablet_id,
return Status::OK();
}
+Status DataDirManager::GetDirAddIfNecessary(const CreateBlockOptions& opts, DataDir** dir) {
+ int new_target_group_size = 0;
+ Status s = GetDirForBlock(opts, dir, &new_target_group_size);
+ if (PREDICT_TRUE(s.ok())) {
+ return Status::OK();
+ }
+ const string& tablet_id = opts.tablet_id;
+ if (tablet_id.empty()) {
+ // This should only be reached by some tests; in cases where there is no
+ // natural tablet_id. Just return whatever error we got.
+ CHECK(IsGTest());
+ return s;
+ }
+ // If we failed for a reason other than lack of space in the data dir gruop,
+ // return the error.
+ if (!s.IsIOError() || s.posix_code() != ENOSPC) {
+ return s;
+ }
+ // If we couldn't get a directory because the group is out of space, try
+ // adding a new directory to the group.
+ DCHECK_GT(new_target_group_size, 0);
+ std::lock_guard<percpu_rwlock> l(dir_group_lock_);
+ const DataDirGroup& group = FindOrDie(group_by_tablet_map_, tablet_id);
+ // If we're already at the new target group size (e.g. because another
+ // thread has added a directory), just return the newly added directory.
+ if (new_target_group_size <= group.uuid_indices().size()) {
+ *dir = FindOrDie(data_dir_by_uuid_idx_, group.uuid_indices().back());
+ return Status::OK();
+ }
+ vector<int> group_uuid_indices = group.uuid_indices();
+ GetDirsForGroupUnlocked(new_target_group_size, &group_uuid_indices);
+ if (PREDICT_FALSE(group_uuid_indices.size() < new_target_group_size)) {
+ // If we couldn't add to the group, return an error.
+ int num_total = data_dirs_.size();
+ int num_failed = failed_data_dirs_.size();
+ int num_full = 0;
+ for (const auto& dd : data_dirs_) {
+ if (dd->is_full()) num_full++;
+ }
+ return Status::IOError(
+ Substitute("No directories could be added ($0 dirs total, $1 failed, $2 full): $3",
+ num_total, num_failed, num_full, s.ToString()), "", ENODEV);
+ }
+ // Update the groups. The tablet ID should not be associated with the data
+ // dir already, and we should update the existing data dir group.
+ // NOTE: it is not the responsibility of the DataDirManager to persist groups
+ // to disk. On-disk representations of data dir groups (e.g. those in the
+ // TabletSuperBlockPB) should be re-written upon modifying them.
+ int new_uuid_idx = group_uuid_indices.back();
+ InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, new_uuid_idx), tablet_id);
+ CHECK(!EmplaceOrUpdate(&group_by_tablet_map_, tablet_id, DataDirGroup(group_uuid_indices)));
+ *dir = FindOrDie(data_dir_by_uuid_idx_, new_uuid_idx);
+ LOG(INFO) << Substitute("Added $0 to $1's directory group: $2",
+ (*dir)->dir(), tablet_id, s.ToString());
+ return Status::OK();
+}
+
void DataDirManager::GetDirsForGroupUnlocked(int target_size,
vector<int>* group_indices) {
DCHECK(dir_group_lock_.is_locked());
vector<int> candidate_indices;
+ unordered_set<int> existing_group_indices(group_indices->begin(), group_indices->end());
for (auto& e : data_dir_by_uuid_idx_) {
- if (ContainsKey(failed_data_dirs_, e.first)) {
+ int uuid_idx = e.first;
+ DCHECK_LT(uuid_idx, data_dirs_.size());
+ if (ContainsKey(existing_group_indices, uuid_idx) ||
+ ContainsKey(failed_data_dirs_, uuid_idx)) {
continue;
}
- Status s = e.second->RefreshAvailableSpace(DataDir::RefreshMode::ALWAYS);
- WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", e.second->dir()));
- if (s.ok() && !e.second->is_full()) {
+ DataDir* dd = e.second;
+ Status s = dd->RefreshAvailableSpace(DataDir::RefreshMode::ALWAYS);
+ WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", dd->dir()));
+ if (s.ok() && !dd->is_full()) {
// TODO(awong): If a disk is unhealthy at the time of group creation, the
// resulting group may be below targeted size. Add functionality to
// resize groups. See KUDU-2040 for more details.
- candidate_indices.push_back(e.first);
+ candidate_indices.push_back(uuid_idx);
}
}
while (group_indices->size() < target_size && !candidate_indices.empty()) {
@@ -1200,7 +1258,6 @@ void DataDirManager::RemoveUnhealthyDataDirsUnlocked(const vector<int>& uuid_ind
}
healthy_indices->clear();
for (int uuid_idx : uuid_indices) {
- DCHECK_LT(uuid_idx, data_dirs_.size());
if (!ContainsKey(failed_data_dirs_, uuid_idx)) {
healthy_indices->emplace_back(uuid_idx);
}
diff --git a/src/kudu/fs/data_dirs.h b/src/kudu/fs/data_dirs.h
index b7f7f0f..0082f99 100644
--- a/src/kudu/fs/data_dirs.h
+++ b/src/kudu/fs/data_dirs.h
@@ -335,10 +335,10 @@ class DataDirManager {
// and data dir to tablet set are cleared of all references to the tablet.
void DeleteDataDirGroup(const std::string& tablet_id);
- // Returns a random directory, giving preference to the one with more free
- // space in the specified option's data dir group. If there is no room in the
- // group, returns an error.
- Status GetNextDataDir(const CreateBlockOptions& opts, DataDir** dir);
+ // Returns a dir for block placement in the data dir group specified in
+ // 'opts'. If none exists, adds a new dir to the group and returns the dir,
+ // and if none can be added, returns an error.
+ Status GetDirAddIfNecessary(const CreateBlockOptions& opts, DataDir** dir);
// Finds the set of tablet_ids in the data dir specified by 'uuid_idx' and
// returns a copy, returning an empty set if none are found.
@@ -464,8 +464,16 @@ class DataDirManager {
std::vector<std::unique_ptr<PathInstanceMetadataFile>> instances_to_update,
std::vector<std::string> new_all_uuids);
+ // Returns a random directory in the data dir group specified in 'opts',
+ // giving preference to those with more free space. If there is no room in
+ // the group, returns an IOError with the ENOSPC posix code and returns the
+ // new target size for the data dir group.
+ Status GetDirForBlock(const CreateBlockOptions& opts, DataDir** dir,
+ int* new_target_group_size) const;
+
// Repeatedly selects directories from those available to put into a new
// DataDirGroup until 'group_indices' reaches 'target_size' elements.
+ //
// Selection is based on "The Power of Two Choices in Randomized Load
// Balancing", selecting two directories randomly and choosing the one with
// less load, quantified as the number of unique tablets in the directory.
@@ -473,10 +481,11 @@ class DataDirManager {
// resulting behavior fills directories that have fewer tablets stored on
// them while not completely neglecting those with more tablets.
//
- // 'group_indices' is an output that stores the list of uuid_indices to be
- // added. Although this function does not itself change DataDirManager state,
- // its expected usage warrants that it is called within the scope of a
- // lock_guard of dir_group_lock_.
+ // 'group_indices' is an in/out parameter that stores the list of UUID
+ // indices to be added; UUID indices that are already in 'group_indices' are
+ // not considered. Although this function does not itself change
+ // DataDirManager state, its expected usage warrants that it is called within
+ // the scope of a lock_guard of dir_group_lock_.
void GetDirsForGroupUnlocked(int target_size, std::vector<int>* group_indices);
// Goes through the data dirs in 'uuid_indices' and populates
@@ -529,7 +538,7 @@ class DataDirManager {
mutable percpu_rwlock dir_group_lock_;
// RNG used to select directories.
- ThreadSafeRandom rng_;
+ mutable ThreadSafeRandom rng_;
DISALLOW_COPY_AND_ASSIGN(DataDirManager);
};
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 1b92d57..6686606 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -748,7 +748,7 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
CHECK(!opts_.read_only);
DataDir* dir;
- RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
+ RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, opts.tablet_id));
int uuid_idx;
CHECK(dd_manager_->FindUuidIndexByDataDir(dir, &uuid_idx));
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 38e7b90..a06f319 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -2130,7 +2130,7 @@ void LogBlockManager::RemoveDeadContainer(const string& container_name) {
Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
LogBlockContainerRefPtr* container) {
DataDir* dir;
- RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
+ RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, opts.tablet_id));
{
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index b72c5d7..a7bdecd 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -28,6 +28,7 @@
#include <sstream>
#include <string>
#include <thread>
+#include <unordered_set>
#include <utility>
#include <vector>
@@ -56,6 +57,7 @@
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/block_id.h"
+#include "kudu/fs/data_dirs.h"
#include "kudu/fs/fs-test-util.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/fs_manager.h"
@@ -120,6 +122,7 @@ using kudu::clock::Clock;
using kudu::clock::HybridClock;
using kudu::consensus::ConsensusStatePB;
using kudu::fs::CreateCorruptBlock;
+using kudu::fs::DataDirManager;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::Messenger;
@@ -137,6 +140,7 @@ using std::shared_ptr;
using std::string;
using std::thread;
using std::unique_ptr;
+using std::unordered_set;
using std::vector;
using strings::Substitute;
@@ -157,8 +161,11 @@ DECLARE_bool(fail_dns_resolution);
DECLARE_bool(rowset_metadata_store_keys);
DECLARE_double(cfile_inject_corruption);
DECLARE_double(env_inject_eio);
+DECLARE_double(env_inject_full);
DECLARE_int32(flush_threshold_mb);
DECLARE_int32(flush_threshold_secs);
+DECLARE_int32(fs_data_dirs_available_space_cache_seconds);
+DECLARE_int32(fs_target_data_dirs_per_tablet);
DECLARE_int32(maintenance_manager_num_threads);
DECLARE_int32(metrics_retirement_age_ms);
DECLARE_int32(scanner_batch_size_rows);
@@ -166,6 +173,7 @@ DECLARE_int32(scanner_gc_check_interval_us);
DECLARE_int32(scanner_ttl_ms);
DECLARE_string(block_manager);
DECLARE_string(env_inject_eio_globs);
+DECLARE_string(env_inject_full_globs);
// Declare these metrics prototypes for simpler unit testing of their behavior.
METRIC_DECLARE_counter(block_manager_total_bytes_read);
@@ -574,6 +582,66 @@ TEST_F(TabletServerTest, TestTombstonedTabletOnWebUI) {
ASSERT_STR_NOT_CONTAINS(s, mini_server_->bound_rpc_addr().ToString());
}
+class TabletServerDiskSpaceTest : public TabletServerTestBase,
+ public testing::WithParamInterface<string> {
+ public:
+ void SetUp() override {
+ FLAGS_block_manager = GetParam();
+ NO_FATALS(TabletServerTestBase::SetUp());
+ NO_FATALS(StartTabletServer(/*num_data_dirs=*/kNumDirs));
+ }
+ protected:
+ const int kNumDirs = FLAGS_fs_target_data_dirs_per_tablet + 1;
+};
+
+// Test that when there isn't enough space in a tablet's data directory group
+// and there are additional directories available, directories are added to the
+// group, and the new groups are persisted to disk.
+TEST_P(TabletServerDiskSpaceTest, TestFullGroupAddsDir) {
+ DataDirManager* dd_manager = mini_server_->server()->fs_manager()->dd_manager();
+ vector<string> dir_group;
+ ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group));
+ ASSERT_EQ(kNumDirs - 1, dir_group.size());
+ FLAGS_fs_data_dirs_available_space_cache_seconds = 0;
+ FLAGS_env_inject_full_globs = JoinStrings(dir_group, ",");
+ FLAGS_env_inject_full = 1.0;
+
+ // Insert some data and flush. This should lead to the creation of a block,
+ // and the addition of a new directory in the dir group.
+ unordered_set<string> old_group(dir_group.begin(), dir_group.end());
+ NO_FATALS(InsertTestRowsRemote(1, 1));
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+ ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group));
+ ASSERT_EQ(kNumDirs, dir_group.size());
+
+ // Grab the newly added directory and check that failing it means the tablet
+ // is in a failed directory.
+ string new_dir;
+ for (const auto& dir : dir_group) {
+ if (!ContainsKey(old_group, dir)) {
+ new_dir = dir;
+ break;
+ }
+ }
+ ASSERT_FALSE(new_dir.empty());
+ string new_uuid;
+ ASSERT_TRUE(dd_manager->FindUuidByRoot(DirName(new_dir), &new_uuid));
+ dd_manager->MarkDataDirFailedByUuid(new_uuid);
+ ASSERT_TRUE(dd_manager->IsTabletInFailedDir(kTabletId));
+
+ // The group should be the updated even after restarting the tablet server.
+ NO_FATALS(ShutdownAndRebuildTablet(kNumDirs));
+ dd_manager = mini_server_->server()->fs_manager()->dd_manager();
+ ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group));
+ ASSERT_EQ(kNumDirs, dir_group.size());
+ ASSERT_TRUE(dd_manager->FindUuidByRoot(DirName(new_dir), &new_uuid));
+ dd_manager->MarkDataDirFailedByUuid(new_uuid);
+ ASSERT_TRUE(dd_manager->IsTabletInFailedDir(kTabletId));
+}
+
+INSTANTIATE_TEST_CASE_P(BlockManager, TabletServerDiskSpaceTest,
+ ::testing::Values("log", "file"));
+
enum class ErrorType {
DISK_FAILURE,
CFILE_CORRUPTION
diff --git a/src/kudu/util/env_util.cc b/src/kudu/util/env_util.cc
index 2fc3b1d..6fb8b8a 100644
--- a/src/kudu/util/env_util.cc
+++ b/src/kudu/util/env_util.cc
@@ -17,9 +17,11 @@
#include "kudu/util/env_util.h"
+#include <fnmatch.h>
+
#include <algorithm>
-#include <cstdint>
#include <cerrno>
+#include <cstdint>
#include <ctime>
#include <memory>
#include <string>
@@ -33,12 +35,14 @@
#include "kudu/gutil/bind.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/util/env.h"
+#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
-#include "kudu/util/slice.h"
#include "kudu/util/path_util.h"
+#include "kudu/util/slice.h"
#include "kudu/util/status.h"
DEFINE_int64(disk_reserved_bytes_free_for_testing, -1,
@@ -70,6 +74,18 @@ TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, unsafe);
TAG_FLAG(disk_reserved_override_prefix_1_bytes_free_for_testing, runtime);
TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, runtime);
+DEFINE_double(env_inject_full, 0.0,
+ "Fraction of the time that space checks on certain paths will "
+ "yield the posix code ENOSPC.");
+TAG_FLAG(env_inject_full, hidden);
+
+DEFINE_string(env_inject_full_globs, "*",
+ "Comma-separated list of glob patterns specifying which paths "
+ "return with space errors.");
+TAG_FLAG(env_inject_full_globs, hidden);
+
+
+using kudu::fault_injection::MaybeTrue;
using std::pair;
using std::shared_ptr;
using std::string;
@@ -78,7 +94,25 @@ using std::unordered_set;
using std::vector;
using strings::Substitute;
+
namespace kudu {
+
+namespace {
+// Returns whether the path specified by 'data_dir' should be considered full.
+bool ShouldInjectSpaceError(const string& data_dir) {
+ if (PREDICT_FALSE(MaybeTrue(FLAGS_env_inject_full))) {
+ vector<string> globs =
+ strings::Split(FLAGS_env_inject_full_globs, ",", strings::SkipEmpty());
+ for (const auto& glob : globs) {
+ if (fnmatch(glob.c_str(), data_dir.c_str(), 0) == 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+} // anonymous namespace
+
namespace env_util {
Status OpenFileForWrite(Env* env, const string& path,
@@ -129,9 +163,14 @@ static void OverrideBytesFreeWithTestingFlags(const string& path, int64_t* bytes
Status VerifySufficientDiskSpace(Env *env, const std::string& path, int64_t requested_bytes,
int64_t reserved_bytes, int64_t* available_bytes) {
- const int64_t kOnePercentReservation = -1;
DCHECK_GE(requested_bytes, 0);
-
+ if (ShouldInjectSpaceError(path)) {
+ if (available_bytes) {
+ *available_bytes = 0;
+ }
+ return Status::IOError(Env::kInjectedFailureStatusMsg, "", ENOSPC);
+ }
+ const int64_t kOnePercentReservation = -1;
SpaceInfo space_info;
RETURN_NOT_OK(env->GetSpaceInfo(path, &space_info));
int64_t free_bytes = space_info.free_bytes;
@@ -150,7 +189,7 @@ Status VerifySufficientDiskSpace(Env *env, const std::string& path, int64_t requ
reserved_bytes = space_info.capacity_bytes / 100;
}
- if (available_bytes != nullptr) {
+ if (available_bytes) {
*available_bytes = free_bytes;
}