You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by dr...@apache.org on 2017/06/05 17:38:44 UTC

[2/2] kudu git commit: KUDU-1952 Remove round-robin for block placement

KUDU-1952 Remove round-robin for block placement

This is the first of a multi-patch patchset to mitigate the effects of
single-disk failure. Throughout the code, the term "DataDir" refers to
a data directory, which is often mounted on a distinct disk. Thus,
"disks" and "data directories" will be used interchangeably.

This patch adds a mapping from tablet to a set of disks and uses it to
replace the existing round-robin placement of blocks. Tablets are
mapped to a fixed number of disks (i.e. a DataDirGroup). New blocks
are placed randomly in directories within each tablet's DataDirGroup.

Tablet-to-group mappings are generated and stored as metadata upon
tablet creation, or upon tablet replacement during a tablet copy.
During group creation, disks are added to groups by randomly selecting
two available directories and selecting the one with fewer tablets on
it ("Power of Two Random Choices"). This avoids pigeonholing new tablets
to disks with relatively few tablets, while still trending towards
filling underloaded disks.

Groups are maintained when restarting the server, as they are flushed
with metadata, and are deleted upon tablet deletion.  When loading
tablet data from a previous version of Kudu, the tablet's metadata
will not have a DataDirGroup. One will be generated containing all
data directories, as the tablet's data may already be spread across
any number of disks.

As this patch only addresses block placement, it does not itself
mitigate the effects of single-disk failure. Given this, and given the
tradeoff between I/O and disk-failure tolerance, the default behavior
will be to spread tablet data across all disks.

Testing is done at the block manager level in block_manager-test and
log_block_manager-test, as well as in the new data_dirs-test.

The implementation of block placement is compared against a python
implementation found here:
https://gist.github.com/andrwng/7c24e8e26aec68c50741f92eb6f2e48d

Sweeping over a few parameters shows nearly identical stddev values of
the distribution of tablets across directories between implementations.

https://github.com/andrwng/kudu/blob/po2c/docs/images/10_20_3_10k_cpp.png
https://github.com/andrwng/kudu/blob/po2c/docs/images/10_20_3_10k_py.png

https://github.com/andrwng/kudu/blob/po2c/docs/images/30_10_5_5k_cpp.png
https://github.com/andrwng/kudu/blob/po2c/docs/images/30_10_5_5k_py.png

https://github.com/andrwng/kudu/blob/po2c/docs/images/30_200_5_5k_cpp.png
https://github.com/andrwng/kudu/blob/po2c/docs/images/30_200_5_5k_py.png

A design doc can be found here:
https://docs.google.com/document/d/1zZk-vb_ETKUuePcZ9ZqoSK2oPvAAaEV1sjDXes8Pxgk/edit?usp=sharing

Change-Id: I9828147f4fa5c4d7f6ed23441dca5a116b8cb11b
Reviewed-on: http://gerrit.cloudera.org:8080/6636
Reviewed-by: David Ribeiro Alves <da...@gmail.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/732ee211
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/732ee211
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/732ee211

Branch: refs/heads/master
Commit: 732ee211a31335a1b146428b15887d1f26468a3a
Parents: ce83b6c
Author: Andrew Wong <aw...@cloudera.com>
Authored: Thu May 4 10:51:53 2017 -0700
Committer: David Ribeiro Alves <da...@gmail.com>
Committed: Mon Jun 5 17:27:44 2017 +0000

----------------------------------------------------------------------
 src/kudu/cfile/bloomfile-test-base.h        |   2 +-
 src/kudu/cfile/cfile-test-base.h            |   2 +-
 src/kudu/cfile/cfile-test.cc                |  10 +-
 src/kudu/fs/CMakeLists.txt                  |   1 +
 src/kudu/fs/block_manager-stress-test.cc    |  14 +-
 src/kudu/fs/block_manager-test.cc           | 277 ++++++++++++++++++----
 src/kudu/fs/block_manager.h                 |  13 +-
 src/kudu/fs/data_dirs-test.cc               | 281 +++++++++++++++++++++++
 src/kudu/fs/data_dirs.cc                    | 176 +++++++++++---
 src/kudu/fs/data_dirs.h                     | 138 ++++++++++-
 src/kudu/fs/file_block_manager.cc           |   6 +-
 src/kudu/fs/file_block_manager.h            |  10 +-
 src/kudu/fs/fs.proto                        |   7 +
 src/kudu/fs/fs_manager-test.cc              |   2 +-
 src/kudu/fs/fs_manager.cc                   |  10 +-
 src/kudu/fs/fs_manager.h                    |  12 +-
 src/kudu/fs/log_block_manager-test.cc       |  60 +++--
 src/kudu/fs/log_block_manager.cc            |  11 +-
 src/kudu/fs/log_block_manager.h             |  19 +-
 src/kudu/tablet/delta_compaction-test.cc    |   2 +-
 src/kudu/tablet/delta_compaction.cc         |  18 +-
 src/kudu/tablet/delta_compaction.h          |   6 +-
 src/kudu/tablet/delta_tracker.cc            |   7 +-
 src/kudu/tablet/deltafile-test.cc           |   6 +-
 src/kudu/tablet/deltamemstore-test.cc       |   2 +-
 src/kudu/tablet/diskrowset.cc               |  21 +-
 src/kudu/tablet/metadata.proto              |   6 +
 src/kudu/tablet/multi_column_writer.cc      |  10 +-
 src/kudu/tablet/multi_column_writer.h       |  11 +-
 src/kudu/tablet/tablet_bootstrap-test.cc    |   6 +
 src/kudu/tablet/tablet_metadata.cc          |  54 ++++-
 src/kudu/tools/kudu-tool-test.cc            |   2 +-
 src/kudu/tserver/tablet_copy_client-test.cc |   3 +-
 src/kudu/tserver/tablet_copy_client.cc      |  15 +-
 src/kudu/tserver/tablet_copy_client.h       |   3 +
 src/kudu/tserver/ts_tablet_manager.cc       |  26 ++-
 36 files changed, 1050 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/cfile/bloomfile-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/bloomfile-test-base.h b/src/kudu/cfile/bloomfile-test-base.h
index aceb536..774de92 100644
--- a/src/kudu/cfile/bloomfile-test-base.h
+++ b/src/kudu/cfile/bloomfile-test-base.h
@@ -71,7 +71,7 @@ class BloomFileTestBase : public KuduTest {
 
   void WriteTestBloomFile() {
     std::unique_ptr<WritableBlock> sink;
-    ASSERT_OK(fs_manager_->CreateNewBlock(&sink));
+    ASSERT_OK(fs_manager_->CreateNewBlock({}, &sink));
     block_id_ = sink->id();
 
     // Set sizing based on flags

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/cfile/cfile-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile-test-base.h b/src/kudu/cfile/cfile-test-base.h
index 58b995d..fa49a99 100644
--- a/src/kudu/cfile/cfile-test-base.h
+++ b/src/kudu/cfile/cfile-test-base.h
@@ -350,7 +350,7 @@ class CFileTestBase : public KuduTest {
                      uint32_t flags,
                      BlockId* block_id) {
     unique_ptr<WritableBlock> sink;
-    ASSERT_OK(fs_manager_->CreateNewBlock(&sink));
+    ASSERT_OK(fs_manager_->CreateNewBlock({}, &sink));
     *block_id = sink->id();
     WriterOptions opts;
     opts.write_posidx = true;

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/cfile/cfile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index 72a91fe..2ab6b2b 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -214,7 +214,7 @@ class TestCFile : public CFileTestBase {
   void TestReadWriteRawBlocks(CompressionType compression, int num_entries) {
     // Test Write
     unique_ptr<WritableBlock> sink;
-    ASSERT_OK(fs_manager_->CreateNewBlock(&sink));
+    ASSERT_OK(fs_manager_->CreateNewBlock({}, &sink));
     BlockId id = sink->id();
     WriterOptions opts;
     opts.write_posidx = true;
@@ -337,7 +337,7 @@ class TestCFile : public CFileTestBase {
     uint8_t corrupt = orig ^ (static_cast<uint8_t>(1) << flip_bit);
     data.mutable_data()[corrupt_offset] = corrupt;
     unique_ptr<fs::WritableBlock> writer;
-    RETURN_NOT_OK(fs_manager_->CreateNewBlock(&writer));
+    RETURN_NOT_OK(fs_manager_->CreateNewBlock({}, &writer));
     RETURN_NOT_OK(writer->Append(data));
     RETURN_NOT_OK(writer->Close());
 
@@ -715,7 +715,7 @@ TEST_P(TestCFileBothCacheTypes, TestMetadata) {
   // Write the file.
   {
     unique_ptr<WritableBlock> sink;
-    ASSERT_OK(fs_manager_->CreateNewBlock(&sink));
+    ASSERT_OK(fs_manager_->CreateNewBlock({}, &sink));
     block_id = sink->id();
     WriterOptions opts;
     CFileWriter w(opts, GetTypeInfo(INT32), false, std::move(sink));
@@ -823,7 +823,7 @@ TEST_P(TestCFileBothCacheTypes, TestDataCorruption) {
 
   // Write some data
   unique_ptr<WritableBlock> sink;
-  ASSERT_OK(fs_manager_->CreateNewBlock(&sink));
+  ASSERT_OK(fs_manager_->CreateNewBlock({}, &sink));
   BlockId id = sink->id();
   WriterOptions opts;
   opts.write_posidx = true;
@@ -887,7 +887,7 @@ TEST_P(TestCFileBothCacheTypes, TestNullDictStrings) {
 
 TEST_P(TestCFileBothCacheTypes, TestReleaseBlock) {
   unique_ptr<WritableBlock> sink;
-  ASSERT_OK(fs_manager_->CreateNewBlock(&sink));
+  ASSERT_OK(fs_manager_->CreateNewBlock({}, &sink));
   ASSERT_EQ(WritableBlock::CLEAN, sink->state());
   WriterOptions opts;
   CFileWriter w(opts, GetTypeInfo(STRING), false, std::move(sink));

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index 78de838..0074082 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -55,6 +55,7 @@ set(KUDU_TEST_LINK_LIBS kudu_fs kudu_fs_test_util ${KUDU_MIN_TEST_LIBS})
 ADD_KUDU_TEST(block_manager-test)
 ADD_KUDU_TEST(block_manager_util-test)
 ADD_KUDU_TEST(block_manager-stress-test RUN_SERIAL true)
+ADD_KUDU_TEST(data_dirs-test)
 ADD_KUDU_TEST(fs_manager-test)
 if (NOT APPLE)
   # Will only pass on Linux.

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index ed5b969..458f3fe 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -22,6 +22,7 @@
 #include <unordered_map>
 #include <vector>
 
+#include "kudu/fs/fs.pb.h"
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
@@ -91,6 +92,7 @@ class BlockManagerStressTest : public KuduTest {
   BlockManagerStressTest() :
     rand_seed_(SeedRandom()),
     stop_latch_(1),
+    test_tablet_name_("test_tablet"),
     total_blocks_written_(0),
     total_bytes_written_(0),
     total_blocks_read_(0),
@@ -124,6 +126,8 @@ class BlockManagerStressTest : public KuduTest {
   virtual void SetUp() OVERRIDE {
     CHECK_OK(bm_->Create());
     CHECK_OK(bm_->Open(nullptr));
+    CHECK_OK(bm_->dd_manager()->CreateDataDirGroup(test_tablet_name_));
+    CHECK(bm_->dd_manager()->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
   }
 
   virtual void TearDown() OVERRIDE {
@@ -223,6 +227,12 @@ class BlockManagerStressTest : public KuduTest {
   // The block manager.
   gscoped_ptr<BlockManager> bm_;
 
+  // Test group of disk to spread data across.
+  DataDirGroupPB test_group_pb_;
+
+  // Test tablet name.
+  string test_tablet_name_;
+
   // The running threads.
   vector<scoped_refptr<Thread> > threads_;
 
@@ -251,7 +261,7 @@ void BlockManagerStressTest<T>::WriterThread() {
     // Create the blocks and write out the PRNG seeds.
     for (int i = 0; i < FLAGS_block_group_size; i++) {
       unique_ptr<WritableBlock> block;
-      CHECK_OK(bm_->CreateBlock(&block));
+      CHECK_OK(bm_->CreateBlock(CreateBlockOptions({ test_tablet_name_ }), &block));
 
       const uint32_t seed = rand.Next() + 1;
       Slice seed_slice(reinterpret_cast<const uint8_t*>(&seed), sizeof(seed));
@@ -465,6 +475,8 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
   this->bm_.reset(this->CreateBlockManager());
   FsReport report;
   ASSERT_OK(this->bm_->Open(&report));
+  ASSERT_OK(this->bm_->dd_manager()->LoadDataDirGroupFromPB(this->test_tablet_name_,
+                                                            this->test_group_pb_));
   ASSERT_OK(report.LogAndCheckForFatalErrors());
   this->RunTest(FLAGS_test_duration_secs / 2);
   checker.Stop();

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 818c9f2..0040cdf 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -20,6 +20,9 @@
 #include <string>
 #include <vector>
 
+#include <google/protobuf/util/message_differencer.h>
+
+#include "kudu/fs/data_dirs.h"
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_report.h"
@@ -38,6 +41,7 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
+using google::protobuf::util::MessageDifferencer;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -50,6 +54,7 @@ DECLARE_uint64(log_container_max_size);
 DECLARE_int64(fs_data_dirs_reserved_bytes);
 DECLARE_int64(disk_reserved_bytes_free_for_testing);
 DECLARE_int32(fs_data_dirs_full_disk_cache_seconds);
+DECLARE_uint32(fs_target_data_dirs_per_tablet);
 DECLARE_string(block_manager);
 DECLARE_double(env_inject_io_error);
 
@@ -76,10 +81,14 @@ METRIC_DECLARE_gauge_uint64(data_dirs_full);
 namespace kudu {
 namespace fs {
 
+static const char* kTestData = "test data";
+
 template <typename T>
 class BlockManagerTest : public KuduTest {
  public:
   BlockManagerTest() :
+    test_tablet_name_("test_tablet"),
+    test_block_opts_(CreateBlockOptions({ test_tablet_name_ })),
     bm_(CreateBlockManager(scoped_refptr<MetricEntity>(),
                            shared_ptr<MemTracker>(),
                            { test_dir_ })) {
@@ -91,6 +100,27 @@ class BlockManagerTest : public KuduTest {
     // unnecessarily.
     FsReport report;
     CHECK_OK(bm_->Open(&report));
+    CHECK_OK(bm_->dd_manager()->CreateDataDirGroup(test_tablet_name_));
+    CHECK(bm_->dd_manager()->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
+  }
+
+  void DistributeBlocksAcrossDirs(int num_dirs, int num_blocks_per_dir) {
+    // Create a data directory group that contains 'num_dirs' data directories.
+    string tablet_name = Substitute("$0_disks", num_dirs);
+    CreateBlockOptions opts({ tablet_name });
+    FLAGS_fs_target_data_dirs_per_tablet = num_dirs;
+    ASSERT_OK(bm_->dd_manager()->CreateDataDirGroup(tablet_name));
+    int num_blocks = num_dirs * num_blocks_per_dir;
+
+    // Write 'num_blocks' blocks to this data dir group.
+    ScopedWritableBlockCloser closer;
+    for (int i = 0; i < num_blocks; i++) {
+      unique_ptr<WritableBlock> written_block;
+      ASSERT_OK(bm_->CreateBlock(opts, &written_block));
+      ASSERT_OK(written_block->Append(kTestData));
+      closer.AddBlock(std::move(written_block));
+    }
+    ASSERT_OK(closer.CloseBlocks());
   }
 
  protected:
@@ -107,12 +137,19 @@ class BlockManagerTest : public KuduTest {
   Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
                             const shared_ptr<MemTracker>& parent_mem_tracker,
                             const vector<string>& paths,
-                            bool create) {
+                            bool create,
+                            bool load_test_group = true) {
     bm_.reset(CreateBlockManager(metric_entity, parent_mem_tracker, paths));
     if (create) {
       RETURN_NOT_OK(bm_->Create());
     }
     RETURN_NOT_OK(bm_->Open(nullptr));
+
+    // Certain tests may maintain their own directory groups, in which case
+    // the default test group should not be used.
+    if (load_test_group) {
+      RETURN_NOT_OK(bm_->dd_manager()->LoadDataDirGroupFromPB(test_tablet_name_, test_group_pb_));
+    }
     return Status::OK();
   }
 
@@ -120,6 +157,33 @@ class BlockManagerTest : public KuduTest {
 
   void RunMemTrackerTest();
 
+  void RunBlockDistributionTest(const vector<string>& paths);
+
+  static Status CountFilesCb(int* num_files, Env::FileType type,
+                      const string& /*dirname*/,
+                      const string& basename) {
+    if (basename == kInstanceMetadataFileName) {
+      return Status::OK();
+    }
+    if (type == Env::FILE_TYPE) {
+      *num_files += 1;
+    }
+    return Status::OK();
+  }
+
+  // Utility function that counts the number of files within a directory
+  // hierarchy, ignoring '.', '..', and file 'kInstanceMetadataFileName'.
+  Status CountFiles(const string& root, int* num_files) {
+    *num_files = 0;
+    RETURN_NOT_OK(env_->Walk(root, Env::PRE_ORDER,
+                             Bind(BlockManagerTest::CountFilesCb, num_files)));
+    return Status::OK();
+  }
+
+  // Keep an internal copy of the data dir group to act as metadata.
+  DataDirGroupPB test_group_pb_;
+  string test_tablet_name_;
+  CreateBlockOptions test_block_opts_;
   gscoped_ptr<T> bm_;
 };
 
@@ -131,6 +195,72 @@ void BlockManagerTest<LogBlockManager>::SetUp() {
   // unnecessarily.
   FsReport report;
   CHECK_OK(bm_->Open(&report));
+  CHECK_OK(bm_->dd_manager()->CreateDataDirGroup(test_tablet_name_));
+
+  // Store the DataDirGroupPB for tests that reopen the block manager.
+  CHECK(bm_->dd_manager()->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
+}
+
+template <>
+void BlockManagerTest<FileBlockManager>::RunBlockDistributionTest(const vector<string>& paths) {
+  vector<int> blocks_in_each_path(paths.size());
+  // Writes 'num_blocks_per_dir' per directory. This will not necessarily write exactly
+  // 'num_blocks_per_dir' in each directory, but will ensure blocks are written to all directories
+  // with a high probability.
+  // Running this 5000 times yielded no failures.
+  int num_blocks_per_dir = 30;
+
+  // Spread across 1, then 3, then 5 data directories.
+  for (int d: { 1, 3, 5 }) {
+    DistributeBlocksAcrossDirs(d, num_blocks_per_dir);
+    // Iterate through the data directories, counting the number of blocks in each directory, and
+    // comparing against the number of blocks in that data directory in the previous iteration. A
+    // difference indicates that new blocks have been written to the directory.
+    int num_paths_added_to = 0;
+    int total_blocks_across_paths = 0;
+    for (int path_idx = 0; path_idx < paths.size(); path_idx++) {
+      int num_blocks = 0;
+      ASSERT_OK(CountFiles(paths[path_idx], &num_blocks));
+      int new_blocks = num_blocks - blocks_in_each_path[path_idx];
+      if (new_blocks > 0) {
+        num_paths_added_to++;
+        total_blocks_across_paths += new_blocks;
+        blocks_in_each_path[path_idx] = num_blocks;
+      }
+    }
+    ASSERT_EQ(d * num_blocks_per_dir, total_blocks_across_paths);
+    ASSERT_EQ(d, num_paths_added_to);
+  }
+}
+
+template <>
+void BlockManagerTest<LogBlockManager>::RunBlockDistributionTest(const vector<string>& paths) {
+  vector<int> files_in_each_path(paths.size());
+  int num_blocks_per_dir = 30;
+  // Spread across 1, then 3, then 5 data directories.
+  for (int d: { 1, 3, 5 }) {
+    DistributeBlocksAcrossDirs(d, num_blocks_per_dir);
+
+    // Check that upon each addition of new paths to data dir groups, new files are being created.
+    // Since log blocks are placed and used randomly within a data dir group, the only expected
+    // behavior is that the total number of files and the number of paths with files will increase.
+    bool some_new_files = false;
+    bool some_new_paths = false;
+    for (int path_idx = 0; path_idx < paths.size(); path_idx++) {
+      int num_files = 0;
+      ASSERT_OK(CountFiles(paths[path_idx], &num_files));
+      int new_files = num_files - files_in_each_path[path_idx];
+      if (new_files > 0) {
+        some_new_files = true;
+        if (files_in_each_path[path_idx] == 0) {
+          some_new_paths = true;
+        }
+        files_in_each_path[path_idx] = num_files;
+      }
+    }
+    ASSERT_TRUE(some_new_paths);
+    ASSERT_TRUE(some_new_files);
+  }
 }
 
 template <>
@@ -150,12 +280,19 @@ void BlockManagerTest<FileBlockManager>::RunMultipathTest(const vector<string>&
                                                  &instance));
     }
   }
-
-  // Write ten blocks.
+  // Create a DataDirGroup for the data that's about to be inserted.
+  // Spread the data across all 3 paths. Writing twenty blocks randomly within
+  // this group should result in blocks being placed in every directory with a
+  // high probability.
+  CreateBlockOptions opts({ "multipath_test" });
+  FLAGS_fs_target_data_dirs_per_tablet = 3;
+  ASSERT_OK(bm_->dd_manager()->CreateDataDirGroup("multipath_test"));
+
+  // Write twenty blocks.
   const char* kTestData = "test data";
-  for (int i = 0; i < 10; i++) {
+  for (int i = 0; i < 20; i++) {
     unique_ptr<WritableBlock> written_block;
-    ASSERT_OK(bm_->CreateBlock(&written_block));
+    ASSERT_OK(bm_->CreateBlock(opts, &written_block));
     ASSERT_OK(written_block->Append(kTestData));
     ASSERT_OK(written_block->Close());
   }
@@ -163,36 +300,46 @@ void BlockManagerTest<FileBlockManager>::RunMultipathTest(const vector<string>&
   // Each path should now have some additional block subdirectories. We
   // can't know for sure exactly how many (depends on the block IDs
   // generated), but this ensures that at least some change were made.
+  int num_blocks = 0;
   for (const string& path : paths) {
     vector<string> children;
     ASSERT_OK(env_->GetChildren(path, &children));
-    ASSERT_GT(children.size(), 3);
+    int blocks_in_path = 0;
+    ASSERT_OK(CountFiles(path, &blocks_in_path));
+    num_blocks += blocks_in_path;
   }
+  ASSERT_EQ(20, num_blocks);
 }
 
 template <>
 void BlockManagerTest<LogBlockManager>::RunMultipathTest(const vector<string>& paths) {
   // Write (3 * numPaths * 2) blocks, in groups of (numPaths * 2). That should
   // yield two containers per path.
+  CreateBlockOptions opts({ "multipath_test" });
+  FLAGS_fs_target_data_dirs_per_tablet = 3;
+  ASSERT_OK(bm_->dd_manager()->CreateDataDirGroup("multipath_test"));
+
   const char* kTestData = "test data";
-  for (int i = 0; i < 3; i++) {
-    ScopedWritableBlockCloser closer;
-    for (int j = 0; j < paths.size() * 2; j++) {
-      unique_ptr<WritableBlock> block;
-      ASSERT_OK(bm_->CreateBlock(&block));
-      ASSERT_OK(block->Append(kTestData));
-      closer.AddBlock(std::move(block));
-    }
-    ASSERT_OK(closer.CloseBlocks());
+  ScopedWritableBlockCloser closer;
+  // Creates (numPaths * 2) containers.
+  for (int j = 0; j < paths.size() * 2; j++) {
+    unique_ptr<WritableBlock> block;
+    ASSERT_OK(bm_->CreateBlock(opts, &block));
+    ASSERT_OK(block->Append(kTestData));
+    closer.AddBlock(std::move(block));
   }
+  ASSERT_OK(closer.CloseBlocks());
 
-  // Verify the results: 7 children = dot, dotdot, instance file, and two
-  // containers (two files per container).
+  // Verify the results. Each path has dot, dotdot, instance file.
+  // (numPaths * 2) containers were created, each consisting of 2 files.
+  // Thus, there should be a total of (numPaths * (3 + 4)) files.
+  int sum = 0;
   for (const string& path : paths) {
     vector<string> children;
     ASSERT_OK(env_->GetChildren(path, &children));
-    ASSERT_EQ(children.size(), 7);
+    sum += children.size();
   }
+  ASSERT_EQ(paths.size() * 7, sum);
 }
 
 template <>
@@ -201,13 +348,13 @@ void BlockManagerTest<FileBlockManager>::RunMemTrackerTest() {
   ASSERT_OK(ReopenBlockManager(scoped_refptr<MetricEntity>(),
                                tracker,
                                { test_dir_ },
-                               false));
+                               false /* create */));
 
   // The file block manager does not allocate memory for persistent data.
   int64_t initial_mem = tracker->consumption();
   ASSERT_EQ(initial_mem, 0);
   unique_ptr<WritableBlock> writer;
-  ASSERT_OK(bm_->CreateBlock(&writer));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
   ASSERT_OK(writer->Close());
   ASSERT_EQ(tracker->consumption(), initial_mem);
 }
@@ -218,7 +365,7 @@ void BlockManagerTest<LogBlockManager>::RunMemTrackerTest() {
   ASSERT_OK(ReopenBlockManager(scoped_refptr<MetricEntity>(),
                                tracker,
                                { test_dir_ },
-                               false));
+                               false /* create */));
 
   // The initial consumption should be non-zero due to the block map.
   int64_t initial_mem = tracker->consumption();
@@ -226,7 +373,7 @@ void BlockManagerTest<LogBlockManager>::RunMemTrackerTest() {
 
   // Allocating a persistent block should increase the consumption.
   unique_ptr<WritableBlock> writer;
-  ASSERT_OK(bm_->CreateBlock(&writer));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
   ASSERT_OK(writer->Close());
   ASSERT_GT(tracker->consumption(), initial_mem);
 }
@@ -243,7 +390,7 @@ TYPED_TEST_CASE(BlockManagerTest, BlockManagers);
 TYPED_TEST(BlockManagerTest, EndToEndTest) {
   // Create a block.
   unique_ptr<WritableBlock> written_block;
-  ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
 
   // Write some data to it.
   string test_data = "test data";
@@ -283,12 +430,33 @@ TYPED_TEST(BlockManagerTest, EndToEndTest) {
               .IsNotFound());
 }
 
+TYPED_TEST(BlockManagerTest, CreateBlocksInDataDirs) {
+  // Create a block before creating a data dir group.
+  CreateBlockOptions fake_block_opts({ "fake_tablet_name" });
+  Status s = this->bm_->CreateBlock(fake_block_opts, nullptr);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Tried to get directory but no "
+                                    "DataDirGroup registered for tablet");
+
+  // Ensure the data dir groups can only be created once.
+  s = this->bm_->dd_manager()->CreateDataDirGroup(this->test_tablet_name_);
+  ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Tried to create DataDirGroup for tablet "
+                                    "but one is already registered");
+
+  DataDirGroupPB test_group_pb;
+  // Check that the in-memory DataDirGroup did not change.
+  ASSERT_TRUE(this->bm_->dd_manager()->GetDataDirGroupPB(
+      this->test_tablet_name_, &test_group_pb));
+  ASSERT_TRUE(MessageDifferencer::Equals(test_group_pb, this->test_group_pb_));
+}
+
 // Test that we can still read from an opened block after deleting it
 // (even if we can't open it again).
 TYPED_TEST(BlockManagerTest, ReadAfterDeleteTest) {
   // Write a new block.
   unique_ptr<WritableBlock> written_block;
-  ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
   string test_data = "test data";
   ASSERT_OK(written_block->Append(test_data));
   ASSERT_OK(written_block->Close());
@@ -310,7 +478,7 @@ TYPED_TEST(BlockManagerTest, ReadAfterDeleteTest) {
 TYPED_TEST(BlockManagerTest, CloseTwiceTest) {
   // Create a new block and close it repeatedly.
   unique_ptr<WritableBlock> written_block;
-  ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
   ASSERT_OK(written_block->Close());
   ASSERT_OK(written_block->Close());
 
@@ -339,7 +507,7 @@ TYPED_TEST(BlockManagerTest, CloseManyBlocksTest) {
     for (int i = 0; i < kNumBlocks; i++) {
       // Create a block.
       unique_ptr<WritableBlock> written_block;
-      ASSERT_OK(this->bm_->CreateBlock(&written_block));
+      ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
 
       // Write 64k bytes of random data into it.
       uint8_t data[65536];
@@ -361,7 +529,7 @@ TYPED_TEST(BlockManagerTest, CloseManyBlocksTest) {
 // it doesn't break anything.
 TYPED_TEST(BlockManagerTest, FlushDataAsyncTest) {
   unique_ptr<WritableBlock> written_block;
-  ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
   string test_data = "test data";
   ASSERT_OK(written_block->Append(test_data));
   ASSERT_OK(written_block->FlushDataAsync());
@@ -371,7 +539,7 @@ TYPED_TEST(BlockManagerTest, WritableBlockStateTest) {
   unique_ptr<WritableBlock> written_block;
 
   // Common flow: CLEAN->DIRTY->CLOSED.
-  ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
   ASSERT_EQ(WritableBlock::CLEAN, written_block->state());
   string test_data = "test data";
   ASSERT_OK(written_block->Append(test_data));
@@ -382,7 +550,7 @@ TYPED_TEST(BlockManagerTest, WritableBlockStateTest) {
   ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
 
   // Test FLUSHING->CLOSED transition.
-  ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
   ASSERT_OK(written_block->Append(test_data));
   ASSERT_OK(written_block->FlushDataAsync());
   ASSERT_EQ(WritableBlock::FLUSHING, written_block->state());
@@ -390,17 +558,17 @@ TYPED_TEST(BlockManagerTest, WritableBlockStateTest) {
   ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
 
   // Test CLEAN->CLOSED transition.
-  ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
   ASSERT_OK(written_block->Close());
   ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
 
   // Test FlushDataAsync() no-op.
-  ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
   ASSERT_OK(written_block->FlushDataAsync());
   ASSERT_EQ(WritableBlock::FLUSHING, written_block->state());
 
   // Test DIRTY->CLOSED transition.
-  ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
   ASSERT_OK(written_block->Append(test_data));
   ASSERT_OK(written_block->Close());
   ASSERT_EQ(WritableBlock::CLOSED, written_block->state());
@@ -430,10 +598,10 @@ TYPED_TEST(BlockManagerTest, AbortTest) {
   ASSERT_OK(this->ReopenBlockManager(entity,
                                      shared_ptr<MemTracker>(),
                                      { this->test_dir_ },
-                                     false));
+                                     false /* create */));
 
   unique_ptr<WritableBlock> written_block;
-  ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
   string test_data = "test data";
   ASSERT_OK(written_block->Append(test_data));
   ASSERT_OK(written_block->Abort());
@@ -441,7 +609,7 @@ TYPED_TEST(BlockManagerTest, AbortTest) {
   ASSERT_TRUE(this->bm_->OpenBlock(written_block->id(), nullptr)
               .IsNotFound());
 
-  ASSERT_OK(this->bm_->CreateBlock(&written_block));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block));
   ASSERT_OK(written_block->Append(test_data));
   ASSERT_OK(written_block->FlushDataAsync());
   ASSERT_OK(written_block->Abort());
@@ -460,13 +628,13 @@ TYPED_TEST(BlockManagerTest, PersistenceTest) {
   unique_ptr<WritableBlock> written_block1;
   unique_ptr<WritableBlock> written_block2;
   unique_ptr<WritableBlock> written_block3;
-  ASSERT_OK(this->bm_->CreateBlock(&written_block1));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block1));
   ASSERT_OK(written_block1->Close());
-  ASSERT_OK(this->bm_->CreateBlock(&written_block2));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block2));
   string test_data = "test data";
   ASSERT_OK(written_block2->Append(test_data));
   ASSERT_OK(written_block2->Close());
-  ASSERT_OK(this->bm_->CreateBlock(&written_block3));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block3));
   ASSERT_OK(written_block3->Append(test_data));
   ASSERT_OK(written_block3->Close());
   ASSERT_OK(this->bm_->DeleteBlock(written_block3->id()));
@@ -501,6 +669,19 @@ TYPED_TEST(BlockManagerTest, PersistenceTest) {
               .IsNotFound());
 }
 
+TYPED_TEST(BlockManagerTest, BlockDistributionTest) {
+  vector<string> paths;
+  for (int i = 0; i < 5; i++) {
+    paths.push_back(this->GetTestPath(Substitute("block_dist_path$0", i)));
+  }
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     shared_ptr<MemTracker>(),
+                                     paths,
+                                     true /* create */,
+                                     false /* load_test_group */));
+  ASSERT_NO_FATAL_FAILURE(this->RunBlockDistributionTest(paths));
+}
+
 TYPED_TEST(BlockManagerTest, MultiPathTest) {
   // Recreate the block manager with three paths.
   vector<string> paths;
@@ -510,7 +691,8 @@ TYPED_TEST(BlockManagerTest, MultiPathTest) {
   ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
                                      shared_ptr<MemTracker>(),
                                      paths,
-                                     true));
+                                     true /* create */,
+                                     false /* load_test_group */));
 
   ASSERT_NO_FATAL_FAILURE(this->RunMultipathTest(paths));
 }
@@ -522,7 +704,7 @@ static void CloseHelper(ReadableBlock* block) {
 // Tests that ReadableBlock::Close() is thread-safe and idempotent.
 TYPED_TEST(BlockManagerTest, ConcurrentCloseReadableBlockTest) {
   unique_ptr<WritableBlock> writer;
-  ASSERT_OK(this->bm_->CreateBlock(&writer));
+  ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &writer));
   ASSERT_OK(writer->Close());
 
   unique_ptr<ReadableBlock> reader;
@@ -555,7 +737,7 @@ TYPED_TEST(BlockManagerTest, MetricsTest) {
     unique_ptr<ReadableBlock> reader;
 
     // An open writer. Also reflected in total_writable_blocks.
-    ASSERT_OK(this->bm_->CreateBlock(&writer));
+    ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &writer));
     ASSERT_NO_FATAL_FAILURE(CheckMetrics(
         entity, 0, 1, i, i + 1,
         i * kTestData.length(), i * kTestData.length()));
@@ -601,11 +783,12 @@ TYPED_TEST(BlockManagerTest, TestDiskSpaceCheck) {
   ASSERT_OK(this->ReopenBlockManager(entity,
                                      shared_ptr<MemTracker>(),
                                      { GetTestDataDirectory() },
-                                     false));
+                                     false /* create */));
 
   FLAGS_fs_data_dirs_full_disk_cache_seconds = 0; // Don't cache device fullness.
   FLAGS_fs_data_dirs_reserved_bytes = 1; // Keep at least 1 byte reserved in the FS.
   FLAGS_log_container_preallocate_bytes = 0; // Disable preallocation.
+  FLAGS_fs_target_data_dirs_per_tablet = 3; // Use a subset of directories instead of all.
 
   // Normally, a data dir is checked for fullness only after a block is closed;
   // if it's now full, the next attempt at block creation will fail. Only when
@@ -622,12 +805,12 @@ TYPED_TEST(BlockManagerTest, TestDiskSpaceCheck) {
     for (int attempt = 0; attempt < 3; attempt++) {
       unique_ptr<WritableBlock> writer;
       LOG(INFO) << "Attempt #" << ++i;
-      Status s = this->bm_->CreateBlock(&writer);
+      Status s = this->bm_->CreateBlock(this->test_block_opts_, &writer);
       if (FLAGS_disk_reserved_bytes_free_for_testing < FLAGS_fs_data_dirs_reserved_bytes) {
         if (data_dir_observed_full) {
           // The dir was previously observed as full, so CreateBlock() checked
           // fullness again and failed.
-          ASSERT_TRUE(s.IsIOError());
+          ASSERT_TRUE(s.IsIOError()) << s.ToString();
           ASSERT_STR_CONTAINS(s.ToString(), "All data directories are full");
         } else {
           ASSERT_OK(s);
@@ -677,7 +860,7 @@ TYPED_TEST(BlockManagerTest, TestMetadataOkayDespiteFailedWrites) {
   // Creates a block, writing the result to 'out' on success.
   auto create_a_block = [&](BlockId* out) -> Status {
     unique_ptr<WritableBlock> block;
-    RETURN_NOT_OK(this->bm_->CreateBlock(&block));
+    RETURN_NOT_OK(this->bm_->CreateBlock(this->test_block_opts_, &block));
     for (int i = 0; i < kNumAppends; i++) {
       RETURN_NOT_OK(block->Append(kTestData));
     }
@@ -750,7 +933,7 @@ TYPED_TEST(BlockManagerTest, TestMetadataOkayDespiteFailedWrites) {
     ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
                                        shared_ptr<MemTracker>(),
                                        { GetTestDataDirectory() },
-                                       false));
+                                       false /* create */));
   }
 }
 
@@ -758,7 +941,7 @@ TYPED_TEST(BlockManagerTest, TestGetAllBlockIds) {
   vector<BlockId> ids;
   for (int i = 0; i < 100; i++) {
     unique_ptr<WritableBlock> block;
-    ASSERT_OK(this->bm_->CreateBlock(&block));
+    ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &block));
     ASSERT_OK(block->Close());
     ids.push_back(block->id());
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index f9a19a4..7874879 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -42,6 +42,7 @@ class Slice;
 namespace fs {
 
 class BlockManager;
+class DataDirManager;
 struct FsReport;
 
 // The smallest unit of Kudu data that is backed by the local filesystem.
@@ -165,8 +166,12 @@ class ReadableBlock : public Block {
   virtual size_t memory_footprint() const = 0;
 };
 
-// Provides options and hints for block placement.
+// Provides options and hints for block placement. This is used for identifying
+// the correct DataDirGroups to place blocks. In the future this may also be
+// used to specify directories based on block type (e.g. to prefer bloom block
+// placement into SSD-backed directories).
 struct CreateBlockOptions {
+  const std::string tablet_id;
 };
 
 // Block manager creation options.
@@ -226,9 +231,6 @@ class BlockManager {
   virtual Status CreateBlock(const CreateBlockOptions& opts,
                              std::unique_ptr<WritableBlock>* block) = 0;
 
-  // Like the above but uses default options.
-  virtual Status CreateBlock(std::unique_ptr<WritableBlock>* block) = 0;
-
   // Opens an existing block for reading.
   //
   // While it is safe to delete a block that has already been opened, it is
@@ -263,6 +265,9 @@ class BlockManager {
   // concurrent operations are ongoing, some of the blocks themselves may not
   // even exist after the call.
   virtual Status GetAllBlockIds(std::vector<BlockId>* block_ids) = 0;
+
+  // Exposes the underlying DataDirManager.
+  virtual DataDirManager* dd_manager() = 0;
 };
 
 // Closes a group of blocks.

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/data_dirs-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs-test.cc b/src/kudu/fs/data_dirs-test.cc
new file mode 100644
index 0000000..f685bc8
--- /dev/null
+++ b/src/kudu/fs/data_dirs-test.cc
@@ -0,0 +1,281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <set>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include <gflags/gflags_declare.h>
+
+#include "kudu/fs/fs.pb.h"
+#include "kudu/fs/block_manager.h"
+#include "kudu/fs/data_dirs.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/test_util.h"
+#include "kudu/gutil/map-util.h"
+
+using std::set;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+DECLARE_int32(fs_data_dirs_full_disk_cache_seconds);
+DECLARE_int64(disk_reserved_bytes_free_for_testing);
+DECLARE_int64(fs_data_dirs_reserved_bytes);
+DECLARE_uint32(fs_target_data_dirs_per_tablet);
+
+namespace kudu {
+namespace fs {
+
+using internal::DataDirGroup;
+
+static const char* kDirNamePrefix = "test_data_dir";
+static const int kNumDirs = 10;
+static const int kMaxPaths = (1 << 16) - 1;
+
+class DataDirGroupTest : public KuduTest {
+ public:
+  DataDirGroupTest() :
+      test_tablet_name_("test_tablet"),
+      test_block_opts_(CreateBlockOptions({ test_tablet_name_ })),
+      dd_manager_(new DataDirManager(env_, scoped_refptr<kudu::MetricEntity>(),
+            "file", GetDirNames(kNumDirs))) {}
+
+  virtual void SetUp() override {
+    KuduTest::SetUp();
+    FLAGS_fs_target_data_dirs_per_tablet = kNumDirs / 2 + 1;
+    ASSERT_OK(dd_manager_->Create(0));
+    ASSERT_OK(dd_manager_->Open(kMaxPaths, DataDirManager::LockMode::NONE));
+  }
+
+ protected:
+  vector<string> GetDirNames(int num_dirs) {
+    vector<string> ret;
+    for (int i = 0; i < num_dirs; i++) {
+      string dir_name = Substitute("$0-$1", kDirNamePrefix, i);
+      ret.push_back(GetTestPath(dir_name));
+    }
+    return ret;
+  }
+
+  const string test_tablet_name_;
+  const CreateBlockOptions test_block_opts_;
+  std::unique_ptr<DataDirManager> dd_manager_;
+};
+
+TEST_F(DataDirGroupTest, TestCreateGroup) {
+  // Test that the DataDirManager doesn't know about the tablets we're about
+  // to insert.
+  DataDir* dd = nullptr;
+  CreateBlockOptions non_existent_tablet_opts({ test_tablet_name_ });
+  Status s = dd_manager_->GetNextDataDir(non_existent_tablet_opts, &dd);
+  ASSERT_EQ(nullptr, dd);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Tried to get directory but no DataDirGroup "
+                                    "registered for tablet");
+
+  DataDirGroupPB orig_pb;
+  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
+  ASSERT_TRUE(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &orig_pb));
+
+  // Ensure that the DataDirManager will not create a group for a tablet that
+  // it already knows about.
+  s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
+  ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Tried to create DataDirGroup for tablet "
+                                    "but one is already registered");
+  DataDirGroupPB pb;
+  ASSERT_TRUE(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &pb));
+
+  // Verify that the data directory is unchanged after failing to create an
+  // existing tablet.
+  for (int i = 0; i < pb.uuids().size(); i++) {
+    ASSERT_EQ(orig_pb.uuids(i), pb.uuids(i));
+  }
+  ASSERT_EQ(orig_pb.uuids().size(), pb.uuids().size());
+
+  // Check that the tablet's DataDirGroup spans the right number of dirs.
+  int num_dirs_with_tablets = 0;
+  for (const auto& e: dd_manager_->tablets_by_uuid_idx_map_) {
+    if (!e.second.empty()) {
+      ASSERT_EQ(1, e.second.size());
+      num_dirs_with_tablets++;
+    }
+  }
+  ASSERT_EQ(FLAGS_fs_target_data_dirs_per_tablet, num_dirs_with_tablets);
+
+  // Try to use the group.
+  ASSERT_OK(dd_manager_->GetNextDataDir(test_block_opts_, &dd));
+  ASSERT_FALSE(dd->is_full());
+}
+
+TEST_F(DataDirGroupTest, TestLoadFromPB) {
+  // Create a PB, delete the group, then load the group from the PB.
+  DataDirGroupPB orig_pb;
+  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
+  ASSERT_TRUE(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &orig_pb));
+  dd_manager_->DeleteDataDirGroup(test_tablet_name_);
+  ASSERT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, orig_pb));
+
+  // Check that the tablet's DataDirGroup spans the right number of dirs.
+  int num_dirs_with_tablets = 0;
+  for (const auto& e: dd_manager_->tablets_by_uuid_idx_map_) {
+    if (!e.second.empty()) {
+      ASSERT_EQ(1, e.second.size());
+      num_dirs_with_tablets++;
+    }
+  }
+  ASSERT_EQ(FLAGS_fs_target_data_dirs_per_tablet, num_dirs_with_tablets);
+
+  // Ensure that loading from a PB will fail if the DataDirManager already
+  // knows about the tablet.
+  Status s = dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, orig_pb);
+  ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Tried to load DataDirGroup for tablet but "
+                                    "one is already registered");
+}
+
+TEST_F(DataDirGroupTest, TestDeleteDataDirGroup) {
+  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
+  DataDir* dd;
+  ASSERT_OK(dd_manager_->GetNextDataDir(test_block_opts_, &dd));
+  ASSERT_FALSE(dd->is_full());
+  dd_manager_->DeleteDataDirGroup(test_tablet_name_);
+  Status s = dd_manager_->GetNextDataDir(test_block_opts_, &dd);
+  ASSERT_FALSE(dd->is_full());
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Tried to get directory but no DataDirGroup "
+                                    "registered for tablet");
+}
+
+TEST_F(DataDirGroupTest, TestFullDisk) {
+  FLAGS_fs_data_dirs_full_disk_cache_seconds = 0;       // Don't cache device fullness.
+  FLAGS_fs_data_dirs_reserved_bytes = 1;                // Reserved space.
+  FLAGS_disk_reserved_bytes_free_for_testing = 0;       // Free space.
+
+  Status s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "All data directories are full");
+}
+
+TEST_F(DataDirGroupTest, TestLoadBalancingDistribution) {
+  FLAGS_fs_target_data_dirs_per_tablet = 3;
+  const double kNumTablets = 20;
+
+  // Add 'kNumTablets' tablets, each with groups of size
+  // 'FLAGS_fs_target_data_dirs_per_tablet'.
+  for (int tablet_idx = 0; tablet_idx < kNumTablets; tablet_idx++) {
+    ASSERT_OK(dd_manager_->CreateDataDirGroup(Substitute("$0-$1", test_tablet_name_, tablet_idx)));
+  }
+  const double kMeanTabletsPerDir = kNumTablets * FLAGS_fs_target_data_dirs_per_tablet / kNumDirs;
+
+  // Calculate the standard deviation of the number of tablets per disk.
+  // If tablets are evenly spread across directories, this should be small.
+  double sum_squared_dev = 0;
+  for (const auto& e : dd_manager_->tablets_by_uuid_idx_map_) {
+    LOG(INFO) << Substitute("$0 is storing data from $1 tablets.",
+        dd_manager_->data_dir_by_uuid_idx_[e.first]->dir(), e.second.size());
+    double deviation = static_cast<double>(e.second.size()) - kMeanTabletsPerDir;
+    sum_squared_dev += deviation * deviation;
+  }
+  double stddev = sqrt(sum_squared_dev / kNumDirs);
+  LOG(INFO) << Substitute("$0 tablets stored across $1 directories.", kNumTablets, kNumDirs);
+
+  // Looping this 1000 times yielded a couple stddev values over 2.0. A high
+  // standard deviation does not necessarily reveal an error, but does indicate
+  // a relatively unlikely distribution of data directory groups.
+  LOG(INFO) << "Standard deviation: " << stddev;
+
+}
+
+TEST_F(DataDirGroupTest, TestLoadBalancingBias) {
+  // Shows that block placement will tend to favor directories with less load.
+  // First add a set of tablets for skew. Then add more tablets and check that
+  // there's still roughly a uniform distribution.
+  FLAGS_fs_target_data_dirs_per_tablet = 5;
+
+  // Start with one data directory group that has some tablets.
+  const double kTabletsPerSkewedDir = 10;
+  const int kNumSkewedDirs = FLAGS_fs_target_data_dirs_per_tablet;
+
+  // Number of tablets (pre-replication) added after the skew tablets.
+  // This configuration will proceed with 10 directories, and a tablet load
+  // of 20 * 5, for an mean of 10 tablets associated with each dir.
+  const double kNumAdditionalTablets = 10;
+  const string kSkewTabletPrefix = "skew_tablet";
+
+  // Manually create a group for the skewed directories.
+  //
+  // Note: this should not happen in the wild and is used here as a way to
+  // introduce some initial skew to the distribution.
+  auto uuid_idx_iter = dd_manager_->tablets_by_uuid_idx_map_.begin();
+  vector<uint16_t> skewed_dir_indices;
+  for (int i = 0; i < kNumSkewedDirs; i++) {
+    uint16_t uuid_idx = uuid_idx_iter->first;
+    skewed_dir_indices.push_back(uuid_idx);
+    uuid_idx_iter++;
+  }
+
+  // Add tablets to each skewed directory.
+  for (int skew_tablet_idx = 0; skew_tablet_idx < kTabletsPerSkewedDir; skew_tablet_idx++) {
+    string skew_tablet = Substitute("$0-$1", kSkewTabletPrefix, skew_tablet_idx);
+    InsertOrDie(&dd_manager_->group_by_tablet_map_, skew_tablet, DataDirGroup(skewed_dir_indices));
+    for (uint16_t uuid_idx : skewed_dir_indices) {
+      InsertOrDie(&FindOrDie(dd_manager_->tablets_by_uuid_idx_map_, uuid_idx), skew_tablet);
+    }
+  }
+
+  // Add the additional tablets.
+  for (int tablet_idx = 0; tablet_idx < kNumAdditionalTablets; tablet_idx++) {
+    ASSERT_OK(dd_manager_->CreateDataDirGroup(Substitute("$0-$1", test_tablet_name_, tablet_idx)));
+  }
+
+  // Calculate the standard deviation of the number of tablets per disk.
+  double sum_squared_dev = 0;
+  const double kMeanTabletsPerDir = (kTabletsPerSkewedDir * kNumSkewedDirs +
+      kNumAdditionalTablets * FLAGS_fs_target_data_dirs_per_tablet) / kNumDirs;
+  for (const auto& e : dd_manager_->tablets_by_uuid_idx_map_) {
+    LOG(INFO) << Substitute("$0 is storing data from $1 tablets.",
+        dd_manager_->data_dir_by_uuid_idx_[e.first]->dir(), e.second.size());
+    double deviation = static_cast<double>(e.second.size()) - kMeanTabletsPerDir;
+    sum_squared_dev += deviation * deviation;
+  }
+  double stddev = sqrt(sum_squared_dev / kNumDirs);
+
+  // Looping this 5000 times yielded no stddev values higher than 5.0.
+  LOG(INFO) << "Standard deviation: " << stddev;
+
+  // Since the skewed directories start out with 10 tablets, a block-placement
+  // heuristic that only takes into account tablet load would fail to add more
+  // tablets to these skewed directories, as it would lead to all directories
+  // having the mean, 10, tablets. Instead, the block-placement heuristic should
+  // not completely ignore the initially skewed dirs.
+  bool some_added_to_skewed_dirs = false;
+  for (uint16_t skewed_uuid_index : skewed_dir_indices) {
+    set<string>* tablets = FindOrNull(dd_manager_->tablets_by_uuid_idx_map_, skewed_uuid_index);
+    ASSERT_NE(nullptr, tablets);
+    if (tablets->size() > kTabletsPerSkewedDir) {
+      some_added_to_skewed_dirs = true;
+    }
+  }
+  ASSERT_TRUE(some_added_to_skewed_dirs);
+}
+
+} // namespace fs
+} //namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/data_dirs.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index 64a52a4..a9fe498 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -17,22 +17,24 @@
 
 #include "kudu/fs/data_dirs.h"
 
+#include <algorithm>
 #include <cerrno>
 #include <deque>
 #include <memory>
 #include <mutex>
+#include <numeric>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
 #include <utility>
 #include <vector>
 
+#include <boost/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/block_manager_util.h"
-#include "kudu/fs/fs.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
@@ -47,10 +49,21 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/oid_generator.h"
 #include "kudu/util/path_util.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util_prod.h"
 #include "kudu/util/threadpool.h"
 
+DEFINE_uint32(fs_target_data_dirs_per_tablet, 0,
+              "Indicates the target number of data dirs to spread each "
+              "tablet's data across. If greater than the number of data dirs "
+              "available, data will be striped across those available. The "
+              "default value 0 indicates striping should occur across all "
+              "data directories.");
+TAG_FLAG(fs_target_data_dirs_per_tablet, advanced);
+TAG_FLAG(fs_target_data_dirs_per_tablet, experimental);
+
 DEFINE_int64(fs_data_dirs_reserved_bytes, -1,
              "Number of bytes to reserve on each data directory filesystem for "
              "non-Kudu usage. The default, which is represented by -1, is that "
@@ -78,16 +91,19 @@ namespace kudu {
 namespace fs {
 
 using env_util::ScopedFileDeleter;
+using internal::DataDirGroup;
+using std::default_random_engine;
 using std::deque;
+using std::iota;
+using std::shuffle;
 using std::string;
 using std::unique_ptr;
 using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
-namespace {
 
-const char kInstanceMetadataFileName[] = "block_manager_instance";
+namespace {
 
 const char kHolePunchErrorMsg[] =
     "Error during hole punch test. The log block manager requires a "
@@ -151,7 +167,6 @@ DataDirMetrics::DataDirMetrics(const scoped_refptr<MetricEntity>& entity)
 }
 #undef GINIT
 
-
 DataDir::DataDir(Env* env,
                  DataDirMetrics* metrics,
                  string dir,
@@ -243,7 +258,7 @@ DataDirManager::DataDirManager(Env* env,
     : env_(env),
       block_manager_type_(std::move(block_manager_type)),
       paths_(std::move(paths)),
-      data_dirs_next_(0) {
+      rng_(GetRandomSeed32()) {
   DCHECK_GT(paths_.size(), 0);
 
   if (metric_entity) {
@@ -377,6 +392,8 @@ Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
   }
 
   // Build uuid index and data directory maps.
+  UuidByUuidIndexMap uuid_by_idx;
+  UuidIndexByUuidMap idx_by_uuid;
   UuidIndexMap dd_by_uuid_idx;
   ReverseUuidIndexMap uuid_idx_by_dd;
   for (const auto& dd : dds) {
@@ -393,44 +410,151 @@ Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
       return Status::NotSupported(
           Substitute("Block manager supports a maximum of $0 paths", max_data_dirs));
     }
+    InsertOrDie(&uuid_by_idx, idx, path_set.uuid());
+    InsertOrDie(&idx_by_uuid, path_set.uuid(), idx);
     InsertOrDie(&dd_by_uuid_idx, idx, dd.get());
     InsertOrDie(&uuid_idx_by_dd, dd.get(), idx);
+    InsertOrDie(&tablets_by_uuid_idx_map_, idx, set<string>());
   }
 
   data_dirs_.swap(dds);
+  uuid_by_idx_.swap(uuid_by_idx);
+  idx_by_uuid_.swap(idx_by_uuid);
   data_dir_by_uuid_idx_.swap(dd_by_uuid_idx);
   uuid_idx_by_data_dir_.swap(uuid_idx_by_dd);
   return Status::OK();
 }
 
-Status DataDirManager::GetNextDataDir(DataDir** dir) {
-  // Round robin through the data dirs, ignoring ones that are full.
-  unordered_set<DataDir*> full_dds;
-  while (true) {
-    int32_t cur_idx;
-    int32_t next_idx;
-    do {
-      cur_idx = data_dirs_next_.Load();
-      next_idx = (cur_idx + 1) % data_dirs_.size();
-    } while (!data_dirs_next_.CompareAndSet(cur_idx, next_idx));
-
-    DataDir* candidate = data_dirs_[cur_idx].get();
-    RETURN_NOT_OK(candidate->RefreshIsFull(
-        DataDir::RefreshMode::EXPIRED_ONLY));
+Status DataDirManager::LoadDataDirGroupFromPB(const std::string& tablet_id,
+                                              const DataDirGroupPB& pb) {
+  std::lock_guard<percpu_rwlock> lock(dir_group_lock_);
+  DataDirGroup group_from_pb = DataDirGroup::FromPB(pb, idx_by_uuid_);
+  DataDirGroup* other = InsertOrReturnExisting(&group_by_tablet_map_,
+                                               tablet_id,
+                                               group_from_pb);
+  if (other != nullptr) {
+    return Status::AlreadyPresent("Tried to load DataDirGroup for tablet but one is already "
+                                  "registered", tablet_id);
+  }
+  for (uint16_t uuid_idx : group_from_pb.uuid_indices()) {
+    InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, uuid_idx), tablet_id);
+  }
+  return Status::OK();
+}
+
+Status DataDirManager::CreateDataDirGroup(const string& tablet_id,
+                                          DirDistributionMode mode) {
+  std::lock_guard<percpu_rwlock> write_lock(dir_group_lock_);
+  if (ContainsKey(group_by_tablet_map_, tablet_id)) {
+    return Status::AlreadyPresent("Tried to create DataDirGroup for tablet but one is already "
+                                  "registered", tablet_id);
+  }
+  // Adjust the disk group size to fit within the total number of data dirs.
+  uint32_t group_target_size = std::min(FLAGS_fs_target_data_dirs_per_tablet,
+                                        static_cast<uint32_t>(data_dirs_.size()));
+  vector<uint16_t> group_indices;
+  if (group_target_size == 0 || mode == DirDistributionMode::ACROSS_ALL_DIRS) {
+    // If using all dirs, add all regardless of directory state.
+    AppendKeysFromMap(data_dir_by_uuid_idx_, &group_indices);
+  } else {
+    // Randomly select directories, giving preference to those with fewer tablets.
+    RETURN_NOT_OK(GetDirsForGroupUnlocked(group_target_size, &group_indices));
+    if (group_indices.empty()) {
+      return Status::IOError("All data directories are full", "", ENOSPC);
+    }
+  }
+  InsertOrDie(&group_by_tablet_map_, tablet_id, DataDirGroup(group_indices));
+  for (uint16_t uuid_idx : group_indices) {
+    InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, uuid_idx), tablet_id);
+  }
+  return Status::OK();
+}
+
+Status DataDirManager::GetNextDataDir(const CreateBlockOptions& opts, DataDir** dir) {
+  shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
+  const vector<uint16_t>* group_uuid_indices;
+  vector<uint16_t> all_uuid_indices;
+  if (PREDICT_FALSE(opts.tablet_id.empty())) {
+    // This should only be reached by some tests; in cases where there is no
+    // natural tablet_id, select a data dir from any of the directories.
+    CHECK(IsGTest());
+    AppendKeysFromMap(data_dir_by_uuid_idx_, &all_uuid_indices);
+    group_uuid_indices = &all_uuid_indices;
+  } else {
+    // Get the data dir group for the tablet.
+    DataDirGroup* group = FindOrNull(group_by_tablet_map_, opts.tablet_id);
+    if (group == nullptr) {
+      return Status::NotFound("Tried to get directory but no DataDirGroup "
+                              "registered for tablet", opts.tablet_id);
+    }
+    group_uuid_indices = &group->uuid_indices();
+  }
+  vector<int> random_indices(group_uuid_indices->size());
+  iota(random_indices.begin(), random_indices.end(), 0);
+  shuffle(random_indices.begin(), random_indices.end(), default_random_engine(rng_.Next()));
+
+  // Randomly select a member of the group that is not full.
+  for (int i : random_indices) {
+    uint16_t uuid_idx = (*group_uuid_indices)[i];
+    DataDir* candidate = FindOrDie(data_dir_by_uuid_idx_, uuid_idx);
+    RETURN_NOT_OK(candidate->RefreshIsFull(DataDir::RefreshMode::EXPIRED_ONLY));
     if (!candidate->is_full()) {
       *dir = candidate;
       return Status::OK();
     }
+  }
+  return Status::IOError(
+      "All data directories are full. Please free some disk space or "
+      "consider changing the fs_data_dirs_reserved_bytes configuration "
+      "parameter", "", ENOSPC);
+}
 
-    // This data dir was full. If all are full, we can't satisfy the request.
-    full_dds.insert(candidate);
-    if (full_dds.size() == data_dirs_.size()) {
-      return Status::IOError(
-          "All data directories are full. Please free some disk space or "
-          "consider changing the fs_data_dirs_reserved_bytes configuration "
-          "parameter", "", ENOSPC);
+void DataDirManager::DeleteDataDirGroup(const std::string& tablet_id) {
+  std::lock_guard<percpu_rwlock> lock(dir_group_lock_);
+  DataDirGroup* group = FindOrNull(group_by_tablet_map_, tablet_id);
+  if (group == nullptr) {
+    return;
+  }
+  // Remove the tablet_id from every data dir in its group.
+  for (uint16_t uuid_idx : group->uuid_indices()) {
+    FindOrDie(tablets_by_uuid_idx_map_, uuid_idx).erase(tablet_id);
+  }
+  group_by_tablet_map_.erase(tablet_id);
+}
+
+bool DataDirManager::GetDataDirGroupPB(const std::string& tablet_id,
+                                       DataDirGroupPB* pb) const {
+  shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
+  const DataDirGroup* group = FindOrNull(group_by_tablet_map_, tablet_id);
+  if (group != nullptr) {
+    group->CopyToPB(uuid_by_idx_, pb);
+    return true;
+  }
+  return false;
+}
+
+Status DataDirManager::GetDirsForGroupUnlocked(uint32_t target_size,
+                                               vector<uint16_t>* group_indices) {
+  vector<uint16_t> candidate_indices;
+  for (auto& e : data_dir_by_uuid_idx_) {
+    RETURN_NOT_OK(e.second->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
+    if (!e.second->is_full()) {
+      candidate_indices.push_back(e.first);
     }
   }
+  while (group_indices->size() < target_size && !candidate_indices.empty()) {
+    shuffle(candidate_indices.begin(), candidate_indices.end(), default_random_engine(rng_.Next()));
+    if (candidate_indices.size() == 1 ||
+        FindOrDie(tablets_by_uuid_idx_map_, candidate_indices[0]).size() <
+            FindOrDie(tablets_by_uuid_idx_map_, candidate_indices[1]).size()) {
+      group_indices->push_back(candidate_indices[0]);
+      candidate_indices.erase(candidate_indices.begin());
+    } else {
+      group_indices->push_back(candidate_indices[1]);
+      candidate_indices.erase(candidate_indices.begin() + 1);
+    }
+  }
+  return Status::OK();
 }
 
 DataDir* DataDirManager::FindDataDirByUuidIndex(uint16_t uuid_idx) const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/data_dirs.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs.h b/src/kudu/fs/data_dirs.h
index e8dc71a..597e9fd 100644
--- a/src/kudu/fs/data_dirs.h
+++ b/src/kudu/fs/data_dirs.h
@@ -21,14 +21,19 @@
 #include <mutex>
 #include <string>
 #include <unordered_map>
+#include <set>
 #include <vector>
 
+#include <boost/optional.hpp>
+
+#include "kudu/fs/fs.pb.h"
 #include "kudu/gutil/callback_forward.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -37,9 +42,60 @@ class AtomicGauge;
 class Env;
 class MetricEntity;
 class ThreadPool;
+class DataDirGroupPB;
 
 namespace fs {
+
+typedef std::unordered_map<uint16_t, std::string> UuidByUuidIndexMap;
+typedef std::unordered_map<std::string, uint16_t> UuidIndexByUuidMap;
+
+class DataDirManager;
 class PathInstanceMetadataFile;
+struct CreateBlockOptions;
+
+const char kInstanceMetadataFileName[] = "block_manager_instance";
+
+namespace internal {
+
+// A DataDirGroup is a group of directories used by an entity for block
+// placement. A group is represented in memory by a list of 2-byte indices,
+// which index into the list of all UUIDs found in a PathSetPB. A group is
+// represented on-disk as a list of full UUIDs, and as such, when writing or
+// reading from disk, a mapping is needed to translate between index and UUID.
+//
+// The same directory may appear in multiple DataDirGroups.
+class DataDirGroup {
+ public:
+  explicit DataDirGroup(std::vector<uint16_t> uuid_indices)
+      : uuid_indices_(std::move(uuid_indices)) {}
+
+  static DataDirGroup FromPB(const DataDirGroupPB& pb,
+                             const UuidIndexByUuidMap& uuid_idx_by_uuid) {
+    std::vector<uint16_t> uuid_indices;
+    for (const std::string& uuid : pb.uuids()) {
+      uuid_indices.push_back(FindOrDie(uuid_idx_by_uuid, uuid));
+    }
+    return DataDirGroup(std::move(uuid_indices));
+  }
+
+  void CopyToPB(const UuidByUuidIndexMap& uuid_by_uuid_idx,
+                DataDirGroupPB* pb) const {
+    DCHECK(pb);
+    DataDirGroupPB group;
+    for (uint16_t uuid_idx : uuid_indices_) {
+      group.add_uuids(FindOrDie(uuid_by_uuid_idx, uuid_idx));
+    }
+    pb->Swap(&group);
+  }
+
+  const std::vector<uint16_t>& uuid_indices() const { return uuid_indices_; }
+
+ private:
+  // UUID indices corresponding to the data directories within the group.
+  const std::vector<uint16_t> uuid_indices_;
+};
+
+}  // namespace internal
 
 struct DataDirMetrics {
   explicit DataDirMetrics(const scoped_refptr<MetricEntity>& entity);
@@ -127,6 +183,11 @@ class DataDirManager {
     NONE,
   };
 
+  enum class DirDistributionMode {
+    ACROSS_ALL_DIRS,
+    USE_FLAG_SPEC,
+  };
+
   DataDirManager(Env* env,
                  scoped_refptr<MetricEntity> metric_entity,
                  std::string block_manager_type,
@@ -147,12 +208,39 @@ class DataDirManager {
   // 'max_data_dirs', or if 'mode' is MANDATORY and locks could not be taken.
   Status Open(int max_data_dirs, LockMode mode);
 
-  // Retrieves the next data directory that isn't full. Directories are rotated
-  // via round-robin. Full directories are skipped.
+  // Deserializes a DataDirGroupPB and associates the resulting DataDirGroup
+  // with a tablet_id.
+  //
+  // Results in an error if the tablet already exists.
+  Status LoadDataDirGroupFromPB(const std::string& tablet_id,
+                                const DataDirGroupPB& pb);
+
+  // Creates a new data dir group for the specified tablet. Adds data
+  // directories to this new group until the limit specified by
+  // fs_target_data_dirs_per_tablet, or until there is no more space.
   //
-  // Returns an error if all data directories are full, or upon filesystem
-  // error. On success, 'dir' is guaranteed to be set.
-  Status GetNextDataDir(DataDir** dir);
+  // If 'mode' is ACROSS_ALL_DIRS, ignores the above flag and stripes across
+  // all disks. This behavior is only used when loading a superblock with no
+  // DataDirGroup, allowing for backwards compatability with data from older
+  // version of Kudu.
+  //
+  // Results in an error if all disks are full or if the tablet already has a
+  // data dir group associated with it. If returning with an error, the
+  // DataDirManager will be unchanged.
+  Status CreateDataDirGroup(const std::string& tablet_id,
+                            DirDistributionMode mode = DirDistributionMode::USE_FLAG_SPEC);
+
+  // Deletes the group for the specified tablet. Maps from tablet_id to group
+  // and data dir to tablet set are cleared of all references to the tablet.
+  void DeleteDataDirGroup(const std::string& tablet_id);
+
+  // Serializes the DataDirGroupPB associated with the given tablet_id. Returns
+  // false if none exist.
+  bool GetDataDirGroupPB(const std::string& tablet_id, DataDirGroupPB* pb) const;
+
+  // Returns a random directory from the specfied option's data dir group. If
+  // there is no room in the group, returns an error.
+  Status GetNextDataDir(const CreateBlockOptions& opts, DataDir** dir);
 
   // Finds a data directory by uuid index, returning nullptr if it can't be
   // found.
@@ -170,6 +258,25 @@ class DataDirManager {
   }
 
  private:
+  FRIEND_TEST(DataDirGroupTest, TestCreateGroup);
+  FRIEND_TEST(DataDirGroupTest, TestLoadFromPB);
+  FRIEND_TEST(DataDirGroupTest, TestLoadBalancingBias);
+  FRIEND_TEST(DataDirGroupTest, TestLoadBalancingDistribution);
+
+  // Repeatedly selects directories from those available to put into a new
+  // DataDirGroup until 'group_indices' reaches 'target_size' elements.
+  // Selection is based on "The Power of Two Choices in Randomized Load
+  // Balancing", selecting two directories randomly and choosing the one with
+  // less load, quantified as the number of unique tablets in the directory.
+  // The resulting behavior fills directories that have fewer tablets stored on
+  // them while not completely neglecting those with more tablets.
+  //
+  // 'group_indices' is an output that stores the list of uuid_indices to be
+  // added. Although this function does not itself change DataDirManager state,
+  // its expected usage warrants that it is called within the scope of a
+  // lock_guard of dir_group_lock_.
+  Status GetDirsForGroupUnlocked(uint32_t target_size, vector<uint16_t>* group_indices);
+
   Env* env_;
   const std::string block_manager_type_;
   const std::vector<std::string> paths_;
@@ -178,14 +285,31 @@ class DataDirManager {
 
   std::vector<std::unique_ptr<DataDir>> data_dirs_;
 
-  AtomicInt<int32_t> data_dirs_next_;
-
   typedef std::unordered_map<uint16_t, DataDir*> UuidIndexMap;
   UuidIndexMap data_dir_by_uuid_idx_;
 
   typedef std::unordered_map<DataDir*, uint16_t> ReverseUuidIndexMap;
   ReverseUuidIndexMap uuid_idx_by_data_dir_;
 
+  typedef std::unordered_map<std::string, internal::DataDirGroup> TabletDataDirGroupMap;
+  TabletDataDirGroupMap group_by_tablet_map_;
+
+  typedef std::unordered_map<uint16_t, std::set<std::string>> TabletsByUuidIndexMap;
+  TabletsByUuidIndexMap tablets_by_uuid_idx_map_;
+
+  UuidByUuidIndexMap uuid_by_idx_;
+  UuidIndexByUuidMap idx_by_uuid_;
+
+  // Lock protecting reads and writes to the dir group maps.
+  // A percpu_rwlock is used so threads attempting to read (e.g. to get the
+  // next data directory for a Flush()) do not block each other, while threads
+  // attempting to write (e.g. to create a new tablet, thereby creating a new
+  // data directory group) block all threads.
+  mutable percpu_rwlock dir_group_lock_;
+
+  // RNG used to select directories.
+  ThreadSafeRandom rng_;
+
   DISALLOW_COPY_AND_ASSIGN(DataDirManager);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index c0c268a..80df53b 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -585,7 +585,7 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
   CHECK(!read_only_);
 
   DataDir* dir;
-  RETURN_NOT_OK(dd_manager_.GetNextDataDir(&dir));
+  RETURN_NOT_OK(dd_manager_.GetNextDataDir(opts, &dir));
   uint16_t uuid_idx;
   CHECK(dd_manager_.FindUuidIndexByDataDir(dir, &uuid_idx));
 
@@ -637,10 +637,6 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
   return s;
 }
 
-Status FileBlockManager::CreateBlock(unique_ptr<WritableBlock>* block) {
-  return CreateBlock(CreateBlockOptions(), block);
-}
-
 Status FileBlockManager::OpenBlock(const BlockId& block_id,
                                    unique_ptr<ReadableBlock>* block) {
   string path;

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/file_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index ebedfb1..1a83ffa 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -62,10 +62,8 @@ struct BlockManagerMetrics;
 // resolved back into a filesystem path when the block is opened for reading.
 // The structure of this ID limits the block manager to at most 65,536 disks.
 //
-// When creating blocks, the block manager will round robin through the
-// available filesystem paths.
-//
-// TODO: Support path-based block placement hints.
+// When creating blocks, the block manager will place blocks based on the
+// provided CreateBlockOptions.
 
 // The file-backed block manager.
 class FileBlockManager : public BlockManager {
@@ -85,8 +83,6 @@ class FileBlockManager : public BlockManager {
   Status CreateBlock(const CreateBlockOptions& opts,
                      std::unique_ptr<WritableBlock>* block) override;
 
-  Status CreateBlock(std::unique_ptr<WritableBlock>* block) override;
-
   Status OpenBlock(const BlockId& block_id,
                    std::unique_ptr<ReadableBlock>* block) override;
 
@@ -96,6 +92,8 @@ class FileBlockManager : public BlockManager {
 
   Status GetAllBlockIds(std::vector<BlockId>* block_ids) override;
 
+  DataDirManager* dd_manager() override { return &dd_manager_; };
+
  private:
   friend class internal::FileBlockLocation;
   friend class internal::FileReadableBlock;

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/fs.proto
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs.proto b/src/kudu/fs/fs.proto
index aa8effc..241187e 100644
--- a/src/kudu/fs/fs.proto
+++ b/src/kudu/fs/fs.proto
@@ -110,3 +110,10 @@ message BlockRecordPB {
   // Required for CREATE.
   optional int64 length = 5;
 }
+
+// Tablet data is spread across a specified number of data directories. The
+// group is represented by the UUIDs of the data directories it consists of.
+message DataDirGroupPB {
+  // List of data directory UUIDs that make up the group. Must not be empty.
+  repeated bytes uuids = 1;
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/fs_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc
index 6b38980..d7ea3be 100644
--- a/src/kudu/fs/fs_manager-test.cc
+++ b/src/kudu/fs/fs_manager-test.cc
@@ -76,7 +76,7 @@ class FsManagerTestBase : public KuduTest {
 
     // Test Write
     unique_ptr<fs::WritableBlock> writer;
-    ASSERT_OK(fs_manager()->CreateNewBlock(&writer));
+    ASSERT_OK(fs_manager()->CreateNewBlock({}, &writer));
     ASSERT_OK(writer->Append(data));
     ASSERT_OK(writer->Close());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/fs_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 14ca730..b48cfa2 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -80,6 +80,8 @@ TAG_FLAG(fs_data_dirs, stable);
 
 using kudu::env_util::ScopedFileDeleter;
 using kudu::fs::BlockManagerOptions;
+using kudu::fs::CreateBlockOptions;
+using kudu::fs::DataDirManager;
 using kudu::fs::FileBlockManager;
 using kudu::fs::FsReport;
 using kudu::fs::LogBlockManager;
@@ -411,6 +413,10 @@ vector<string> FsManager::GetDataRootDirs() const {
   return data_paths;
 }
 
+DataDirManager* FsManager::dd_manager() const {
+  return block_manager_->dd_manager();
+}
+
 string FsManager::GetTabletMetadataDir() const {
   DCHECK(initted_);
   return JoinPathSegments(canonicalized_metadata_fs_root_, kTabletMetadataDirName);
@@ -529,10 +535,10 @@ void FsManager::DumpFileSystemTree(ostream& out, const string& prefix,
 //  Data read/write interfaces
 // ==========================================================================
 
-Status FsManager::CreateNewBlock(unique_ptr<WritableBlock>* block) {
+Status FsManager::CreateNewBlock(const CreateBlockOptions& opts, unique_ptr<WritableBlock>* block) {
   CHECK(!read_only_);
 
-  return block_manager_->CreateBlock(block);
+  return block_manager_->CreateBlock(opts, block);
 }
 
 Status FsManager::OpenBlock(const BlockId& block_id, unique_ptr<ReadableBlock>* block) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/fs_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 3ade3ff..b451fed 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -44,9 +44,12 @@ class MetricEntity;
 
 namespace fs {
 class BlockManager;
+class DataDirManager;
 class ReadableBlock;
 class WritableBlock;
 struct FsReport;
+
+struct CreateBlockOptions;
 } // namespace fs
 
 namespace itest {
@@ -132,10 +135,11 @@ class FsManager {
   //  Data read/write interfaces
   // ==========================================================================
 
-  // Creates a new anonymous block.
+  // Creates a new block based on the options specified in 'opts'.
   //
   // Block will be synced on close.
-  Status CreateNewBlock(std::unique_ptr<fs::WritableBlock>* block);
+  Status CreateNewBlock(const fs::CreateBlockOptions& opts,
+                        std::unique_ptr<fs::WritableBlock>* block);
 
   Status OpenBlock(const BlockId& block_id,
                    std::unique_ptr<fs::ReadableBlock>* block);
@@ -186,7 +190,7 @@ class FsManager {
     return JoinPathSegments(GetConsensusMetadataDir(), tablet_id);
   }
 
-  Env *env() { return env_; }
+  Env* env() { return env_; }
 
   bool read_only() const {
     return read_only_;
@@ -205,6 +209,8 @@ class FsManager {
 
   Status CreateDirIfMissing(const std::string& path, bool* created = NULL);
 
+  fs::DataDirManager* dd_manager() const;
+
   fs::BlockManager* block_manager() {
     return block_manager_.get();
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/732ee211/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 5a57552..f8f155a 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -62,6 +62,8 @@ namespace fs {
 class LogBlockManagerTest : public KuduTest {
  public:
   LogBlockManagerTest() :
+    test_tablet_name_("test_tablet"),
+    test_block_opts_({ test_tablet_name_ }),
     bm_(CreateBlockManager(scoped_refptr<MetricEntity>())) {
   }
 
@@ -71,6 +73,8 @@ class LogBlockManagerTest : public KuduTest {
     // Pass in a report to prevent the block manager from logging unnecessarily.
     FsReport report;
     CHECK_OK(bm_->Open(&report));
+    CHECK_OK(bm_->dd_manager()->CreateDataDirGroup(test_tablet_name_));
+    CHECK(bm_->dd_manager()->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
   }
 
  protected:
@@ -84,6 +88,7 @@ class LogBlockManagerTest : public KuduTest {
   Status ReopenBlockManager(FsReport* report = nullptr) {
     bm_.reset(CreateBlockManager(scoped_refptr<MetricEntity>()));
     RETURN_NOT_OK(bm_->Open(report));
+    RETURN_NOT_OK(bm_->dd_manager()->LoadDataDirGroupFromPB(test_tablet_name_, test_group_pb_));
     return Status::OK();
   }
 
@@ -135,6 +140,10 @@ class LogBlockManagerTest : public KuduTest {
     ASSERT_TRUE(report.partial_record_check->entries.empty());
   }
 
+  DataDirGroupPB test_group_pb_;
+  string test_tablet_name_;
+  CreateBlockOptions test_block_opts_;
+
   unique_ptr<LogBlockManager> bm_;
 
  private:
@@ -208,6 +217,7 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
   scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
   bm_.reset(CreateBlockManager(entity));
   ASSERT_OK(bm_->Open(nullptr));
+  ASSERT_OK(bm_->dd_manager()->LoadDataDirGroupFromPB(test_tablet_name_, test_group_pb_));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 0, 0));
 
   // Lower the max container size so that we can more easily test full
@@ -216,7 +226,7 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
 
   // One block --> one container.
   unique_ptr<WritableBlock> writer;
-  ASSERT_OK(bm_->CreateBlock(&writer));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 1, 0));
 
   // And when the block is closed, it becomes "under management".
@@ -231,7 +241,7 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
     ScopedWritableBlockCloser closer;
     for (int i = 0; i < 10; i++) {
       unique_ptr<WritableBlock> b;
-      ASSERT_OK(bm_->CreateBlock(&b));
+      ASSERT_OK(bm_->CreateBlock(test_block_opts_, &b));
       if (saved_id.IsNull()) {
         saved_id = b->id();
       }
@@ -272,7 +282,7 @@ TEST_F(LogBlockManagerTest, ContainerPreallocationTest) {
   // Create a block with some test data. This should also trigger
   // preallocation of the container, provided it's supported by the kernel.
   unique_ptr<WritableBlock> written_block;
-  ASSERT_OK(bm_->CreateBlock(&written_block));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &written_block));
   ASSERT_OK(written_block->Append(kTestData));
   ASSERT_OK(written_block->Close());
 
@@ -286,7 +296,7 @@ TEST_F(LogBlockManagerTest, ContainerPreallocationTest) {
 
   // Upon writing a second block, we'd expect the container to remain the same
   // size.
-  ASSERT_OK(bm_->CreateBlock(&written_block));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &written_block));
   ASSERT_OK(written_block->Append(kTestData));
   ASSERT_OK(written_block->Close());
   NO_FATALS(GetOnlyContainerDataFile(&container_data_filename));
@@ -296,7 +306,7 @@ TEST_F(LogBlockManagerTest, ContainerPreallocationTest) {
   // Now reopen the block manager and create another block. The block manager
   // should be smart enough to reuse the previously preallocated amount.
   ASSERT_OK(ReopenBlockManager());
-  ASSERT_OK(bm_->CreateBlock(&written_block));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &written_block));
   ASSERT_OK(written_block->Append(kTestData));
   ASSERT_OK(written_block->Close());
   NO_FATALS(GetOnlyContainerDataFile(&container_data_filename));
@@ -318,7 +328,7 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
     ScopedWritableBlockCloser closer;
     for (int i = 0; i < 4; i++) {
       unique_ptr<WritableBlock> writer;
-      ASSERT_OK(bm_->CreateBlock(&writer));
+      ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
       block_ids.push_back(writer->id());
       closer.AddBlock(std::move(writer));
     }
@@ -328,7 +338,7 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
   // Create one more block, which should reuse the first container.
   {
     unique_ptr<WritableBlock> writer;
-    ASSERT_OK(bm_->CreateBlock(&writer));
+    ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
     ASSERT_OK(writer->Close());
   }
 
@@ -345,7 +355,7 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
   bm_->next_block_id_.Store(1);
   for (int i = 0; i < 4; i++) {
     unique_ptr<WritableBlock> writer;
-    ASSERT_OK(bm_->CreateBlock(&writer));
+    ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
     ASSERT_EQ(writer->id(), block_ids[i]);
     ASSERT_OK(writer->Close());
   }
@@ -380,7 +390,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   BlockId last_block_id;
   for (int i = 0; i < 4; i++) {
     unique_ptr<WritableBlock> writer;
-    ASSERT_OK(bm_->CreateBlock(&writer));
+    ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
     last_block_id = writer->id();
     created_blocks.push_back(last_block_id);
     ASSERT_OK(writer->Close());
@@ -443,7 +453,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   // Add a new block, increasing the size of the container metadata file.
   {
     unique_ptr<WritableBlock> writer;
-    ASSERT_OK(bm_->CreateBlock(&writer));
+    ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
     last_block_id = writer->id();
     created_blocks.push_back(last_block_id);
     ASSERT_OK(writer->Close());
@@ -485,7 +495,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   // Add a new block, increasing the size of the container metadata file.
   {
     unique_ptr<WritableBlock> writer;
-    ASSERT_OK(bm_->CreateBlock(&writer));
+    ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
     last_block_id = writer->id();
     created_blocks.push_back(last_block_id);
     ASSERT_OK(writer->Close());
@@ -554,13 +564,13 @@ TEST_F(LogBlockManagerTest, TestAppendExceedsPreallocation) {
 
   // Create a container, preallocate it by one byte, and append more than one.
   unique_ptr<WritableBlock> writer;
-  ASSERT_OK(bm_->CreateBlock(&writer));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
   ASSERT_OK(writer->Append("hello world"));
   ASSERT_OK(writer->Close());
 
   // On second append, don't crash just because the append offset is ahead of
   // the preallocation offset!
-  ASSERT_OK(bm_->CreateBlock(&writer));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
   ASSERT_OK(writer->Append("hello world"));
 }
 
@@ -571,7 +581,7 @@ TEST_F(LogBlockManagerTest, TestPreallocationAndTruncation) {
 
   // Fill up one container.
   unique_ptr<WritableBlock> writer;
-  ASSERT_OK(bm_->CreateBlock(&writer));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
   unique_ptr<uint8_t[]> data(new uint8_t[FLAGS_log_container_max_size]);
   memset(data.get(), 0, FLAGS_log_container_max_size);
   ASSERT_OK(writer->Append({ data.get(), FLAGS_log_container_max_size } ));
@@ -655,7 +665,7 @@ TEST_F(LogBlockManagerTest, TestContainerWithManyHoles) {
   vector<BlockId> ids;
   for (int i = 0; i < kNumBlocks; i++) {
     unique_ptr<WritableBlock> block;
-    ASSERT_OK(bm_->CreateBlock(&block));
+    ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
     ASSERT_OK(block->Append("aaaa"));
     ASSERT_OK(block->Close());
     ids.push_back(block->id());
@@ -741,7 +751,7 @@ TEST_F(LogBlockManagerTest, TestContainerBlockLimiting) {
   auto create_some_blocks = [&]() -> Status {
     for (int i = 0; i < kNumBlocks; i++) {
       unique_ptr<WritableBlock> block;
-      RETURN_NOT_OK(bm_->CreateBlock(&block));
+      RETURN_NOT_OK(bm_->CreateBlock(test_block_opts_, &block));
       RETURN_NOT_OK(block->Append("aaaa"));
       RETURN_NOT_OK(block->Close());
     }
@@ -774,7 +784,7 @@ TEST_F(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
 
   // Create one container.
   unique_ptr<WritableBlock> block;
-  ASSERT_OK(bm_->CreateBlock(&block));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
   ASSERT_OK(block->Close());
   string container_name;
   NO_FATALS(GetOnlyContainer(&container_name));
@@ -796,7 +806,7 @@ TEST_F(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
       num_misaligned_blocks++;
     } else {
       unique_ptr<WritableBlock> block;
-      ASSERT_OK(bm_->CreateBlock(&block));
+      ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
 
       // Append at least once to ensure that the data file grows.
       //
@@ -883,7 +893,7 @@ TEST_F(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
     ScopedWritableBlockCloser closer;
     for (int i = 0; i < kNumContainers; i++) {
       unique_ptr<WritableBlock> block;
-      ASSERT_OK(bm_->CreateBlock(&block));
+      ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
       ASSERT_OK(block->Append("a"));
       closer.AddBlock(std::move(block));
     }
@@ -928,7 +938,7 @@ TEST_F(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
 
   // Create one container.
   unique_ptr<WritableBlock> block;
-  ASSERT_OK(bm_->CreateBlock(&block));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
   ASSERT_OK(block->Close());
   string data_file;
   NO_FATALS(GetOnlyContainerDataFile(&data_file));
@@ -1007,7 +1017,7 @@ TEST_F(LogBlockManagerTest, TestDetectMalformedRecords) {
 
   // Create one container.
   unique_ptr<WritableBlock> block;
-  ASSERT_OK(bm_->CreateBlock(&block));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
   ASSERT_OK(block->Append("a"));
   ASSERT_OK(block->Close());
   string container_name;
@@ -1039,7 +1049,7 @@ TEST_F(LogBlockManagerTest, TestDetectMisalignedBlocks) {
 
   // Create one container.
   unique_ptr<WritableBlock> block;
-  ASSERT_OK(bm_->CreateBlock(&block));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
   ASSERT_OK(block->Append("a"));
   ASSERT_OK(block->Close());
   string container_name;
@@ -1075,7 +1085,7 @@ TEST_F(LogBlockManagerTest, TestRepairPartialRecords) {
     ScopedWritableBlockCloser closer;
     for (int i = 0; i < kNumContainers; i++) {
       unique_ptr<WritableBlock> block;
-      ASSERT_OK(bm_->CreateBlock(&block));
+      ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
       ASSERT_OK(block->Append("a"));
       closer.AddBlock(std::move(block));
     }
@@ -1113,7 +1123,7 @@ TEST_F(LogBlockManagerTest, TestDeleteDeadContainersAtStartup) {
 
   // Create one container.
   unique_ptr<WritableBlock> block;
-  ASSERT_OK(bm_->CreateBlock(&block));
+  ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
   ASSERT_OK(block->Append("a"));
   ASSERT_OK(block->Close());
   string data_file_name;
@@ -1146,7 +1156,7 @@ TEST_F(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
   vector<BlockId> block_ids;
   for (int i = 0; i < FLAGS_log_container_max_blocks; i++) {
     unique_ptr<WritableBlock> block;
-    ASSERT_OK(bm_->CreateBlock(&block));
+    ASSERT_OK(bm_->CreateBlock(test_block_opts_, &block));
     ASSERT_OK(block->Append("a"));
     ASSERT_OK(block->Close());
     block_ids.emplace_back(block->id());