You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/08/27 22:46:09 UTC

[kudu] branch master updated: KUDU-2907: add a dir to dir groups when full

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ae80d16  KUDU-2907: add a dir to dir groups when full
ae80d16 is described below

commit ae80d16037a5df14b4d35a76655564dae7dffe24
Author: Andrew Wong <aw...@apache.org>
AuthorDate: Wed Aug 21 14:36:19 2019 -0700

    KUDU-2907: add a dir to dir groups when full
    
    This patch extends the directory-picking behavior when selecting
    directories for new blocks if all directories in the given tablet's
    group are full. It now allows the expansion of a tablet's data dir group
    if healthy directories are available.
    
    This doesn't change the policy for picking directories for groups; it
    only extends the usage of the policy to potentially add directories when
    getting a directory for a new block.
    
    Change-Id: Ic8e05bdfe1fc2ac0e7152d493f289d3ac6e850d7
    Reviewed-on: http://gerrit.cloudera.org:8080/14122
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/fs/data_dirs-test.cc          | 105 ++++++++++++++++++++--
 src/kudu/fs/data_dirs.cc               | 157 ++++++++++++++++++++++-----------
 src/kudu/fs/data_dirs.h                |  27 ++++--
 src/kudu/fs/file_block_manager.cc      |   2 +-
 src/kudu/fs/log_block_manager.cc       |   2 +-
 src/kudu/tserver/tablet_server-test.cc |  68 ++++++++++++++
 src/kudu/util/env_util.cc              |  49 ++++++++--
 7 files changed, 336 insertions(+), 74 deletions(-)

diff --git a/src/kudu/fs/data_dirs-test.cc b/src/kudu/fs/data_dirs-test.cc
index 078a884..9fa9027 100644
--- a/src/kudu/fs/data_dirs-test.cc
+++ b/src/kudu/fs/data_dirs-test.cc
@@ -21,7 +21,9 @@
 #include <ostream>
 #include <set>
 #include <string>
+#include <thread>
 #include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -37,6 +39,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/barrier.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/metrics.h"
@@ -47,17 +50,21 @@
 
 using std::set;
 using std::string;
+using std::thread;
 using std::vector;
 using std::unique_ptr;
+using std::unordered_set;
 using strings::Substitute;
 
 DECLARE_bool(crash_on_eio);
 DECLARE_double(env_inject_eio);
+DECLARE_double(env_inject_full);
 DECLARE_int32(fs_data_dirs_available_space_cache_seconds);
 DECLARE_int32(fs_target_data_dirs_per_tablet);
 DECLARE_int64(disk_reserved_bytes_free_for_testing);
 DECLARE_int64(fs_data_dirs_reserved_bytes);
 DECLARE_string(env_inject_eio_globs);
+DECLARE_string(env_inject_full_globs);
 
 METRIC_DECLARE_gauge_uint64(data_dirs_failed);
 
@@ -108,7 +115,7 @@ TEST_F(DataDirsTest, TestCreateGroup) {
   // Test that the DataDirManager doesn't know about the tablets we're about
   // to insert.
   DataDir* dd = nullptr;
-  Status s = dd_manager_->GetNextDataDir(test_block_opts_, &dd);
+  Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd);
   ASSERT_EQ(nullptr, dd);
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Tried to get directory but no directory group "
@@ -145,7 +152,7 @@ TEST_F(DataDirsTest, TestCreateGroup) {
   ASSERT_EQ(FLAGS_fs_target_data_dirs_per_tablet, num_dirs_with_tablets);
 
   // Try to use the group.
-  ASSERT_OK(dd_manager_->GetNextDataDir(test_block_opts_, &dd));
+  ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd));
   ASSERT_FALSE(dd->is_full());
 }
 
@@ -177,26 +184,108 @@ TEST_F(DataDirsTest, TestLoadFromPB) {
 TEST_F(DataDirsTest, TestDeleteDataDirGroup) {
   ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
   DataDir* dd;
-  ASSERT_OK(dd_manager_->GetNextDataDir(test_block_opts_, &dd));
+  ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd));
   ASSERT_FALSE(dd->is_full());
   dd_manager_->DeleteDataDirGroup(test_tablet_name_);
-  Status s = dd_manager_->GetNextDataDir(test_block_opts_, &dd);
+  Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd);
   ASSERT_FALSE(dd->is_full());
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Tried to get directory but no directory group "
                                     "registered for tablet");
 }
 
+// Inject full disk errors a couple different ways and make sure that we can't
+// create directory groups.
 TEST_F(DataDirsTest, TestFullDisk) {
+  FLAGS_env_inject_full = 1.0;
+  Status s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "All healthy data directories are full");
+  FLAGS_env_inject_full = 0;
+
   FLAGS_fs_data_dirs_available_space_cache_seconds = 0; // Don't cache device available space.
   FLAGS_fs_data_dirs_reserved_bytes = 1;                // Reserved space.
   FLAGS_disk_reserved_bytes_free_for_testing = 0;       // Free space.
 
-  Status s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
+  s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
   ASSERT_TRUE(s.IsIOError()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "All healthy data directories are full");
 }
 
+// Test that when an entire directory group is full, new directories are added
+// to the group.
+TEST_F(DataDirsTest, TestFullDiskGrowsGroup) {
+  // First, create a directory group.
+  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
+
+  // Mark every directory in the directory group full.
+  vector<string> data_dir_group;
+  ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &data_dir_group));
+  FLAGS_fs_data_dirs_available_space_cache_seconds = 0;
+  FLAGS_env_inject_full_globs = JoinStrings(data_dir_group, ",");
+  FLAGS_env_inject_full = 1.0;
+
+  // Try getting a new directory, adding if necessary.
+  DataDir* new_dir;
+  ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &new_dir));
+  unordered_set<string> old_dirs(data_dir_group.begin(), data_dir_group.end());
+  ASSERT_FALSE(ContainsKey(old_dirs, new_dir->dir()));
+
+  // The selected directory should have been added to the data directory group.
+  vector<string> new_dir_group;
+  ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &new_dir_group));
+  unordered_set<string> new_dirs(new_dir_group.begin(), new_dir_group.end());
+  ASSERT_TRUE(ContainsKey(new_dirs, new_dir->dir()));
+
+  // If all directories are full, we shouldn't be able to get a new directory.
+  FLAGS_env_inject_full_globs = "*";
+  Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &new_dir);
+  ASSERT_STR_CONTAINS(s.ToString(), "No directories available");
+  ASSERT_TRUE(s.IsIOError());
+}
+
+// Test that concurrently adding dirs to a data dir group yields the expected
+// number of dirs added.
+TEST_F(DataDirsTest, TestGrowGroupInParallel) {
+  // Set up a data dir group with no space so we can add to it.
+  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
+  vector<string> data_dir_group;
+  ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &data_dir_group));
+  FLAGS_fs_data_dirs_available_space_cache_seconds = 0;
+  FLAGS_env_inject_full_globs = JoinStrings(data_dir_group, ",");
+  FLAGS_env_inject_full = 1.0;
+
+  // In parallel, try adding a directory to the group.
+  const int kNumThreads = 32;
+  vector<thread> threads;
+  vector<Status> statuses(kNumThreads);
+  vector<DataDir*> dds(kNumThreads);
+  Barrier b(kNumThreads);
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&, i] {
+      b.Wait();
+      statuses[i] = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dds[i]);
+    });
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+  for (const auto& s : statuses) {
+    EXPECT_OK(s);
+  }
+  // Check that we added a directory.
+  unordered_set<string> old_dirs(data_dir_group.begin(), data_dir_group.end());
+  ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &data_dir_group));
+  ASSERT_EQ(FLAGS_fs_target_data_dirs_per_tablet + 1, data_dir_group.size());
+  DataDir* new_dir = dds[0];
+  ASSERT_FALSE(ContainsKey(old_dirs, new_dir->dir()));
+
+  // All returned data directories should have been the newly added one.
+  for (int i = 1; i < kNumThreads; i++) {
+    ASSERT_EQ(new_dir, dds[i]);
+  }
+}
+
 TEST_F(DataDirsTest, TestFailedDirNotReturned) {
   FLAGS_fs_target_data_dirs_per_tablet = 2;
   ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
@@ -204,7 +293,7 @@ TEST_F(DataDirsTest, TestFailedDirNotReturned) {
   DataDir* failed_dd;
   int uuid_idx;
   // Fail one of the directories in the group and verify that it is not used.
-  ASSERT_OK(dd_manager_->GetNextDataDir(test_block_opts_, &failed_dd));
+  ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &failed_dd));
   ASSERT_TRUE(dd_manager_->FindUuidIndexByDataDir(failed_dd, &uuid_idx));
   // These calls are idempotent.
   ASSERT_OK(dd_manager_->MarkDataDirFailed(uuid_idx));
@@ -213,7 +302,7 @@ TEST_F(DataDirsTest, TestFailedDirNotReturned) {
   ASSERT_EQ(1, down_cast<AtomicGauge<uint64_t>*>(
         entity_->FindOrNull(METRIC_data_dirs_failed).get())->value());
   for (int i = 0; i < 10; i++) {
-    ASSERT_OK(dd_manager_->GetNextDataDir(test_block_opts_, &dd));
+    ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd));
     ASSERT_NE(dd, failed_dd);
   }
 
@@ -222,7 +311,7 @@ TEST_F(DataDirsTest, TestFailedDirNotReturned) {
   ASSERT_OK(dd_manager_->MarkDataDirFailed(uuid_idx));
   ASSERT_EQ(2, down_cast<AtomicGauge<uint64_t>*>(
         entity_->FindOrNull(METRIC_data_dirs_failed).get())->value());
-  Status s = dd_manager_->GetNextDataDir(test_block_opts_, &failed_dd);
+  Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &failed_dd);
   ASSERT_TRUE(s.IsIOError());
   ASSERT_STR_CONTAINS(s.ToString(), "No healthy directories exist in tablet's directory group");
 }
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index f365e38..7263c09 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -24,7 +24,6 @@
 #include <iterator>
 #include <memory>
 #include <mutex>
-#include <numeric>
 #include <ostream>
 #include <random>
 #include <string>
@@ -116,7 +115,6 @@ namespace fs {
 
 using internal::DataDirGroup;
 using std::default_random_engine;
-using std::iota;
 using std::pair;
 using std::set;
 using std::shuffle;
@@ -948,12 +946,12 @@ Status DataDirManager::CreateDataDirGroup(const string& tablet_id,
   return Status::OK();
 }
 
-Status DataDirManager::GetNextDataDir(const CreateBlockOptions& opts, DataDir** dir) {
+Status DataDirManager::GetDirForBlock(const CreateBlockOptions& opts, DataDir** dir,
+                                      int* new_target_group_size) const {
   shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
-  const vector<int>* group_uuid_indices;
-  vector<int> valid_uuid_indices;
-  DataDirGroup* group = nullptr;
-
+  vector<int> healthy_uuid_indices;
+  const DataDirGroup* group = nullptr;
+  DataDirGroup group_for_tests;
   if (PREDICT_TRUE(!opts.tablet_id.empty())) {
     // Get the data dir group for the tablet.
     group = FindOrNull(group_by_tablet_map_, opts.tablet_id);
@@ -961,29 +959,29 @@ Status DataDirManager::GetNextDataDir(const CreateBlockOptions& opts, DataDir**
       return Status::NotFound("Tried to get directory but no directory group "
                               "registered for tablet", opts.tablet_id);
     }
-    if (PREDICT_TRUE(failed_data_dirs_.empty())) {
-      group_uuid_indices = &group->uuid_indices();
-    } else {
-      RemoveUnhealthyDataDirsUnlocked(group->uuid_indices(), &valid_uuid_indices);
-      group_uuid_indices = &valid_uuid_indices;
-      if (valid_uuid_indices.empty()) {
-        return Status::IOError("No healthy directories exist in tablet's "
-                               "directory group", opts.tablet_id, ENODEV);
-      }
-    }
   } else {
     // This should only be reached by some tests; in cases where there is no
     // natural tablet_id, select a data dir from any of the directories.
     CHECK(IsGTest());
-    AppendKeysFromMap(data_dir_by_uuid_idx_, &valid_uuid_indices);
-    group_uuid_indices = &valid_uuid_indices;
+    vector<int> all_uuid_indices;
+    AppendKeysFromMap(data_dir_by_uuid_idx_, &all_uuid_indices);
+    group_for_tests = DataDirGroup(std::move(all_uuid_indices));
+    group = &group_for_tests;
   }
-
-  // In a certain group of dirs, randomly choose two not full dirs and select the one
-  // with more free space.
+  // Within a given directory group, filter out the ones that are failed.
+  if (PREDICT_TRUE(failed_data_dirs_.empty())) {
+    healthy_uuid_indices = group->uuid_indices();
+  } else {
+    RemoveUnhealthyDataDirsUnlocked(group->uuid_indices(), &healthy_uuid_indices);
+    if (healthy_uuid_indices.empty()) {
+      return Status::IOError("No healthy directories exist in tablet's "
+                             "directory group", opts.tablet_id, ENODEV);
+    }
+  }
+  // Within a given directory group, filter out the ones that are full.
   vector<DataDir*> candidate_dirs;
-  for (auto uuid_idx : *group_uuid_indices) {
-    DataDir *candidate = FindOrDie(data_dir_by_uuid_idx_, uuid_idx);
+  for (auto uuid_idx : healthy_uuid_indices) {
+    DataDir* candidate = FindOrDie(data_dir_by_uuid_idx_, uuid_idx);
     Status s = candidate->RefreshAvailableSpace(DataDir::RefreshMode::EXPIRED_ONLY);
     WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", candidate->dir()));
     if (s.ok() && !candidate->is_full()) {
@@ -991,35 +989,33 @@ Status DataDirManager::GetNextDataDir(const CreateBlockOptions& opts, DataDir**
     }
   }
 
-  if (candidate_dirs.size() == 1) {
-    *dir = candidate_dirs[0];
-    return Status::OK();
-  }
-
-  if (candidate_dirs.size() >= 2) {
-    shuffle(candidate_dirs.begin(), candidate_dirs.end(), default_random_engine(rng_.Next()));
-    *dir = (candidate_dirs[0]->available_bytes() > candidate_dirs[1]->available_bytes()) ?
-            candidate_dirs[0] : candidate_dirs[1];
-    return Status::OK();
-  }
-
-  // All healthy directories are full. Return an error.
-  if (PREDICT_TRUE(!opts.tablet_id.empty())) {
+  // If all the directories in the group are full, return an ENOSPC error.
+  if (PREDICT_FALSE(candidate_dirs.empty())) {
     DCHECK(group);
     size_t num_total = group->uuid_indices().size();
-    size_t num_full = valid_uuid_indices.size();
+    size_t num_full = 0;
+    for (const auto& idx : group->uuid_indices()) {
+      if (FindOrDie(data_dir_by_uuid_idx_, idx)->is_full()) {
+        num_full++;
+      }
+    }
     size_t num_failed = num_total - num_full;
+    *new_target_group_size = num_total + 1;
     return Status::IOError(
         Substitute("No directories available in $0's directory group ($1 dirs "
                    "total, $2 failed, $3 full)",
                    opts.tablet_id, num_total, num_failed, num_full),
         "", ENOSPC);
   }
-
-  return Status::IOError(
-      Substitute("No directories available ($0 dirs, all of which are full)",
-                 valid_uuid_indices.size()),
-      "", ENOSPC);
+  if (candidate_dirs.size() == 1) {
+    *dir = candidate_dirs[0];
+    return Status::OK();
+  }
+  // Pick two randomly and select the one with more space.
+  shuffle(candidate_dirs.begin(), candidate_dirs.end(), default_random_engine(rng_.Next()));
+  *dir = (candidate_dirs[0]->available_bytes() > candidate_dirs[1]->available_bytes()) ?
+          candidate_dirs[0] : candidate_dirs[1];
+  return Status::OK();
 }
 
 void DataDirManager::DeleteDataDirGroup(const std::string& tablet_id) {
@@ -1047,21 +1043,83 @@ Status DataDirManager::GetDataDirGroupPB(const string& tablet_id,
   return Status::OK();
 }
 
+Status DataDirManager::GetDirAddIfNecessary(const CreateBlockOptions& opts, DataDir** dir) {
+  int new_target_group_size = 0;
+  Status s = GetDirForBlock(opts, dir, &new_target_group_size);
+  if (PREDICT_TRUE(s.ok())) {
+    return Status::OK();
+  }
+  const string& tablet_id = opts.tablet_id;
+  if (tablet_id.empty()) {
+    // This should only be reached by some tests; in cases where there is no
+    // natural tablet_id. Just return whatever error we got.
+    CHECK(IsGTest());
+    return s;
+  }
+  // If we failed for a reason other than lack of space in the data dir gruop,
+  // return the error.
+  if (!s.IsIOError() || s.posix_code() != ENOSPC) {
+    return s;
+  }
+  // If we couldn't get a directory because the group is out of space, try
+  // adding a new directory to the group.
+  DCHECK_GT(new_target_group_size, 0);
+  std::lock_guard<percpu_rwlock> l(dir_group_lock_);
+  const DataDirGroup& group = FindOrDie(group_by_tablet_map_, tablet_id);
+  // If we're already at the new target group size (e.g. because another
+  // thread has added a directory), just return the newly added directory.
+  if (new_target_group_size <= group.uuid_indices().size()) {
+    *dir = FindOrDie(data_dir_by_uuid_idx_, group.uuid_indices().back());
+    return Status::OK();
+  }
+  vector<int> group_uuid_indices = group.uuid_indices();
+  GetDirsForGroupUnlocked(new_target_group_size, &group_uuid_indices);
+  if (PREDICT_FALSE(group_uuid_indices.size() < new_target_group_size)) {
+    // If we couldn't add to the group, return an error.
+    int num_total = data_dirs_.size();
+    int num_failed = failed_data_dirs_.size();
+    int num_full = 0;
+    for (const auto& dd : data_dirs_) {
+      if (dd->is_full()) num_full++;
+    }
+    return Status::IOError(
+        Substitute("No directories could be added ($0 dirs total, $1 failed, $2 full): $3",
+        num_total, num_failed, num_full, s.ToString()), "", ENODEV);
+  }
+  // Update the groups. The tablet ID should not be associated with the data
+  // dir already, and we should update the existing data dir group.
+  // NOTE: it is not the responsibility of the DataDirManager to persist groups
+  // to disk. On-disk representations of data dir groups (e.g. those in the
+  // TabletSuperBlockPB) should be re-written upon modifying them.
+  int new_uuid_idx = group_uuid_indices.back();
+  InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, new_uuid_idx), tablet_id);
+  CHECK(!EmplaceOrUpdate(&group_by_tablet_map_, tablet_id, DataDirGroup(group_uuid_indices)));
+  *dir = FindOrDie(data_dir_by_uuid_idx_, new_uuid_idx);
+  LOG(INFO) << Substitute("Added $0 to $1's directory group: $2",
+                          (*dir)->dir(), tablet_id, s.ToString());
+  return Status::OK();
+}
+
 void DataDirManager::GetDirsForGroupUnlocked(int target_size,
                                              vector<int>* group_indices) {
   DCHECK(dir_group_lock_.is_locked());
   vector<int> candidate_indices;
+  unordered_set<int> existing_group_indices(group_indices->begin(), group_indices->end());
   for (auto& e : data_dir_by_uuid_idx_) {
-    if (ContainsKey(failed_data_dirs_, e.first)) {
+    int uuid_idx = e.first;
+    DCHECK_LT(uuid_idx, data_dirs_.size());
+    if (ContainsKey(existing_group_indices, uuid_idx) ||
+        ContainsKey(failed_data_dirs_, uuid_idx)) {
       continue;
     }
-    Status s = e.second->RefreshAvailableSpace(DataDir::RefreshMode::ALWAYS);
-    WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", e.second->dir()));
-    if (s.ok() && !e.second->is_full()) {
+    DataDir* dd = e.second;
+    Status s = dd->RefreshAvailableSpace(DataDir::RefreshMode::ALWAYS);
+    WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", dd->dir()));
+    if (s.ok() && !dd->is_full()) {
       // TODO(awong): If a disk is unhealthy at the time of group creation, the
       // resulting group may be below targeted size. Add functionality to
       // resize groups. See KUDU-2040 for more details.
-      candidate_indices.push_back(e.first);
+      candidate_indices.push_back(uuid_idx);
     }
   }
   while (group_indices->size() < target_size && !candidate_indices.empty()) {
@@ -1200,7 +1258,6 @@ void DataDirManager::RemoveUnhealthyDataDirsUnlocked(const vector<int>& uuid_ind
   }
   healthy_indices->clear();
   for (int uuid_idx : uuid_indices) {
-    DCHECK_LT(uuid_idx, data_dirs_.size());
     if (!ContainsKey(failed_data_dirs_, uuid_idx)) {
       healthy_indices->emplace_back(uuid_idx);
     }
diff --git a/src/kudu/fs/data_dirs.h b/src/kudu/fs/data_dirs.h
index b7f7f0f..0082f99 100644
--- a/src/kudu/fs/data_dirs.h
+++ b/src/kudu/fs/data_dirs.h
@@ -335,10 +335,10 @@ class DataDirManager {
   // and data dir to tablet set are cleared of all references to the tablet.
   void DeleteDataDirGroup(const std::string& tablet_id);
 
-  // Returns a random directory, giving preference to the one with more free
-  // space in the specified option's data dir group. If there is no room in the
-  // group, returns an error.
-  Status GetNextDataDir(const CreateBlockOptions& opts, DataDir** dir);
+  // Returns a dir for block placement in the data dir group specified in
+  // 'opts'. If none exists, adds a new dir to the group and returns the dir,
+  // and if none can be added, returns an error.
+  Status GetDirAddIfNecessary(const CreateBlockOptions& opts, DataDir** dir);
 
   // Finds the set of tablet_ids in the data dir specified by 'uuid_idx' and
   // returns a copy, returning an empty set if none are found.
@@ -464,8 +464,16 @@ class DataDirManager {
       std::vector<std::unique_ptr<PathInstanceMetadataFile>> instances_to_update,
       std::vector<std::string> new_all_uuids);
 
+  // Returns a random directory in the data dir group specified in 'opts',
+  // giving preference to those with more free space. If there is no room in
+  // the group, returns an IOError with the ENOSPC posix code and returns the
+  // new target size for the data dir group.
+  Status GetDirForBlock(const CreateBlockOptions& opts, DataDir** dir,
+                        int* new_target_group_size) const;
+
   // Repeatedly selects directories from those available to put into a new
   // DataDirGroup until 'group_indices' reaches 'target_size' elements.
+  //
   // Selection is based on "The Power of Two Choices in Randomized Load
   // Balancing", selecting two directories randomly and choosing the one with
   // less load, quantified as the number of unique tablets in the directory.
@@ -473,10 +481,11 @@ class DataDirManager {
   // resulting behavior fills directories that have fewer tablets stored on
   // them while not completely neglecting those with more tablets.
   //
-  // 'group_indices' is an output that stores the list of uuid_indices to be
-  // added. Although this function does not itself change DataDirManager state,
-  // its expected usage warrants that it is called within the scope of a
-  // lock_guard of dir_group_lock_.
+  // 'group_indices' is an in/out parameter that stores the list of UUID
+  // indices to be added; UUID indices that are already in 'group_indices' are
+  // not considered. Although this function does not itself change
+  // DataDirManager state, its expected usage warrants that it is called within
+  // the scope of a lock_guard of dir_group_lock_.
   void GetDirsForGroupUnlocked(int target_size, std::vector<int>* group_indices);
 
   // Goes through the data dirs in 'uuid_indices' and populates
@@ -529,7 +538,7 @@ class DataDirManager {
   mutable percpu_rwlock dir_group_lock_;
 
   // RNG used to select directories.
-  ThreadSafeRandom rng_;
+  mutable ThreadSafeRandom rng_;
 
   DISALLOW_COPY_AND_ASSIGN(DataDirManager);
 };
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 1b92d57..6686606 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -748,7 +748,7 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
   CHECK(!opts_.read_only);
 
   DataDir* dir;
-  RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
+  RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
       error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, opts.tablet_id));
   int uuid_idx;
   CHECK(dd_manager_->FindUuidIndexByDataDir(dir, &uuid_idx));
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 38e7b90..a06f319 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -2130,7 +2130,7 @@ void LogBlockManager::RemoveDeadContainer(const string& container_name) {
 Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
                                              LogBlockContainerRefPtr* container) {
   DataDir* dir;
-  RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
+  RETURN_NOT_OK_EVAL(dd_manager_->GetDirAddIfNecessary(opts, &dir),
       error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, opts.tablet_id));
 
   {
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index b72c5d7..a7bdecd 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -28,6 +28,7 @@
 #include <sstream>
 #include <string>
 #include <thread>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -56,6 +57,7 @@
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/fs/block_id.h"
+#include "kudu/fs/data_dirs.h"
 #include "kudu/fs/fs-test-util.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_manager.h"
@@ -120,6 +122,7 @@ using kudu::clock::Clock;
 using kudu::clock::HybridClock;
 using kudu::consensus::ConsensusStatePB;
 using kudu::fs::CreateCorruptBlock;
+using kudu::fs::DataDirManager;
 using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::Messenger;
@@ -137,6 +140,7 @@ using std::shared_ptr;
 using std::string;
 using std::thread;
 using std::unique_ptr;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -157,8 +161,11 @@ DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(rowset_metadata_store_keys);
 DECLARE_double(cfile_inject_corruption);
 DECLARE_double(env_inject_eio);
+DECLARE_double(env_inject_full);
 DECLARE_int32(flush_threshold_mb);
 DECLARE_int32(flush_threshold_secs);
+DECLARE_int32(fs_data_dirs_available_space_cache_seconds);
+DECLARE_int32(fs_target_data_dirs_per_tablet);
 DECLARE_int32(maintenance_manager_num_threads);
 DECLARE_int32(metrics_retirement_age_ms);
 DECLARE_int32(scanner_batch_size_rows);
@@ -166,6 +173,7 @@ DECLARE_int32(scanner_gc_check_interval_us);
 DECLARE_int32(scanner_ttl_ms);
 DECLARE_string(block_manager);
 DECLARE_string(env_inject_eio_globs);
+DECLARE_string(env_inject_full_globs);
 
 // Declare these metrics prototypes for simpler unit testing of their behavior.
 METRIC_DECLARE_counter(block_manager_total_bytes_read);
@@ -574,6 +582,66 @@ TEST_F(TabletServerTest, TestTombstonedTabletOnWebUI) {
   ASSERT_STR_NOT_CONTAINS(s, mini_server_->bound_rpc_addr().ToString());
 }
 
+class TabletServerDiskSpaceTest : public TabletServerTestBase,
+                                  public testing::WithParamInterface<string> {
+ public:
+  void SetUp() override {
+    FLAGS_block_manager = GetParam();
+    NO_FATALS(TabletServerTestBase::SetUp());
+    NO_FATALS(StartTabletServer(/*num_data_dirs=*/kNumDirs));
+  }
+ protected:
+  const int kNumDirs = FLAGS_fs_target_data_dirs_per_tablet + 1;
+};
+
+// Test that when there isn't enough space in a tablet's data directory group
+// and there are additional directories available, directories are added to the
+// group, and the new groups are persisted to disk.
+TEST_P(TabletServerDiskSpaceTest, TestFullGroupAddsDir) {
+  DataDirManager* dd_manager = mini_server_->server()->fs_manager()->dd_manager();
+  vector<string> dir_group;
+  ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group));
+  ASSERT_EQ(kNumDirs - 1, dir_group.size());
+  FLAGS_fs_data_dirs_available_space_cache_seconds = 0;
+  FLAGS_env_inject_full_globs = JoinStrings(dir_group, ",");
+  FLAGS_env_inject_full = 1.0;
+
+  // Insert some data and flush. This should lead to the creation of a block,
+  // and the addition of a new directory in the dir group.
+  unordered_set<string> old_group(dir_group.begin(), dir_group.end());
+  NO_FATALS(InsertTestRowsRemote(1, 1));
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
+  ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group));
+  ASSERT_EQ(kNumDirs, dir_group.size());
+
+  // Grab the newly added directory and check that failing it means the tablet
+  // is in a failed directory.
+  string new_dir;
+  for (const auto& dir : dir_group) {
+    if (!ContainsKey(old_group, dir)) {
+      new_dir = dir;
+      break;
+    }
+  }
+  ASSERT_FALSE(new_dir.empty());
+  string new_uuid;
+  ASSERT_TRUE(dd_manager->FindUuidByRoot(DirName(new_dir), &new_uuid));
+  dd_manager->MarkDataDirFailedByUuid(new_uuid);
+  ASSERT_TRUE(dd_manager->IsTabletInFailedDir(kTabletId));
+
+  // The group should be the updated even after restarting the tablet server.
+  NO_FATALS(ShutdownAndRebuildTablet(kNumDirs));
+  dd_manager = mini_server_->server()->fs_manager()->dd_manager();
+  ASSERT_OK(dd_manager->FindDataDirsByTabletId(kTabletId, &dir_group));
+  ASSERT_EQ(kNumDirs, dir_group.size());
+  ASSERT_TRUE(dd_manager->FindUuidByRoot(DirName(new_dir), &new_uuid));
+  dd_manager->MarkDataDirFailedByUuid(new_uuid);
+  ASSERT_TRUE(dd_manager->IsTabletInFailedDir(kTabletId));
+}
+
+INSTANTIATE_TEST_CASE_P(BlockManager, TabletServerDiskSpaceTest,
+    ::testing::Values("log", "file"));
+
 enum class ErrorType {
   DISK_FAILURE,
   CFILE_CORRUPTION
diff --git a/src/kudu/util/env_util.cc b/src/kudu/util/env_util.cc
index 2fc3b1d..6fb8b8a 100644
--- a/src/kudu/util/env_util.cc
+++ b/src/kudu/util/env_util.cc
@@ -17,9 +17,11 @@
 
 #include "kudu/util/env_util.h"
 
+#include <fnmatch.h>
+
 #include <algorithm>
-#include <cstdint>
 #include <cerrno>
+#include <cstdint>
 #include <ctime>
 #include <memory>
 #include <string>
@@ -33,12 +35,14 @@
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/util/env.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
-#include "kudu/util/slice.h"
 #include "kudu/util/path_util.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
 DEFINE_int64(disk_reserved_bytes_free_for_testing, -1,
@@ -70,6 +74,18 @@ TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, unsafe);
 TAG_FLAG(disk_reserved_override_prefix_1_bytes_free_for_testing, runtime);
 TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, runtime);
 
+DEFINE_double(env_inject_full, 0.0,
+              "Fraction of the time that space checks on certain paths will "
+              "yield the posix code ENOSPC.");
+TAG_FLAG(env_inject_full, hidden);
+
+DEFINE_string(env_inject_full_globs, "*",
+              "Comma-separated list of glob patterns specifying which paths "
+              "return with space errors.");
+TAG_FLAG(env_inject_full_globs, hidden);
+
+
+using kudu::fault_injection::MaybeTrue;
 using std::pair;
 using std::shared_ptr;
 using std::string;
@@ -78,7 +94,25 @@ using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
+
 namespace kudu {
+
+namespace {
+// Returns whether the path specified by 'data_dir' should be considered full.
+bool ShouldInjectSpaceError(const string& data_dir) {
+  if (PREDICT_FALSE(MaybeTrue(FLAGS_env_inject_full))) {
+    vector<string> globs =
+        strings::Split(FLAGS_env_inject_full_globs, ",", strings::SkipEmpty());
+    for (const auto& glob : globs) {
+      if (fnmatch(glob.c_str(), data_dir.c_str(), 0) == 0) {
+        return true;
+      }
+    }
+  }
+  return false;
+}
+} // anonymous namespace
+
 namespace env_util {
 
 Status OpenFileForWrite(Env* env, const string& path,
@@ -129,9 +163,14 @@ static void OverrideBytesFreeWithTestingFlags(const string& path, int64_t* bytes
 
 Status VerifySufficientDiskSpace(Env *env, const std::string& path, int64_t requested_bytes,
                                  int64_t reserved_bytes, int64_t* available_bytes) {
-  const int64_t kOnePercentReservation = -1;
   DCHECK_GE(requested_bytes, 0);
-
+  if (ShouldInjectSpaceError(path)) {
+    if (available_bytes) {
+      *available_bytes = 0;
+    }
+    return Status::IOError(Env::kInjectedFailureStatusMsg, "", ENOSPC);
+  }
+  const int64_t kOnePercentReservation = -1;
   SpaceInfo space_info;
   RETURN_NOT_OK(env->GetSpaceInfo(path, &space_info));
   int64_t free_bytes = space_info.free_bytes;
@@ -150,7 +189,7 @@ Status VerifySufficientDiskSpace(Env *env, const std::string& path, int64_t requ
     reserved_bytes = space_info.capacity_bytes / 100;
   }
 
-  if (available_bytes != nullptr) {
+  if (available_bytes) {
     *available_bytes = free_bytes;
   }