You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/11/13 07:16:42 UTC

[kudu] branch master updated: KUDU-2977 Sharding block map to speed up tserver startup

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 98f44f4  KUDU-2977 Sharding block map to speed up tserver startup
98f44f4 is described below

commit 98f44f4537ceddffedaf9afce26b634c4ab2142a
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Fri Oct 18 11:06:08 2019 +0800

    KUDU-2977 Sharding block map to speed up tserver startup
    
    Separate LogBlockManager's block lock and container lock,
    and sharding block map.
    
    Tablet server with multiple data directories cost much time to
    startup, there is a thread for each data directory, each of them
    should acquire a global lock to add blocks found in its own data
    directory to the global block map, so there is a queue to acquire
    lock that may cost much time. The same when remove a batch of blocks.
    
    After bunch of benchmarks, we found pretty stable improvment of this
    patch, time saving much more when block count increasing (reduce about
    20% startup time when 10,000,000 blocks on 8 data directories).
    
    Sharding count is 8, which has the best performance after comparing
    with 1, 2, 4, 8, 16, 32, 64 and 128.
    
    Change-Id: If0d5c13e051a2c1d6cfd1c9ad7db8a3cd195459d
    Reviewed-on: http://gerrit.cloudera.org:8080/14555
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/fs/log_block_manager-test.cc |  62 +++++++++++----
 src/kudu/fs/log_block_manager.cc      | 146 ++++++++++++++++++----------------
 src/kudu/fs/log_block_manager.h       |  48 ++++++-----
 3 files changed, 153 insertions(+), 103 deletions(-)

diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 897789f..2804d82 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -79,6 +79,7 @@ DECLARE_bool(crash_on_eio);
 DECLARE_double(env_inject_eio);
 DECLARE_double(log_container_excess_space_before_cleanup_fraction);
 DECLARE_double(log_container_live_metadata_before_compact_ratio);
+DECLARE_int32(fs_target_data_dirs_per_tablet);
 DECLARE_int64(block_manager_max_open_files);
 DECLARE_int64(log_container_max_blocks);
 DECLARE_string(block_manager_preflush_control);
@@ -122,10 +123,13 @@ class LogBlockManagerTest : public KuduTest {
   }
 
  protected:
-  LogBlockManager* CreateBlockManager(const scoped_refptr<MetricEntity>& metric_entity) {
+  LogBlockManager* CreateBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
+                                      std::vector<std::string> test_data_dirs = {}) {
+    PrepareDataDirs(&test_data_dirs);
+
     if (!dd_manager_) {
       // Ensure the directory manager is initialized.
-      CHECK_OK(DataDirManager::CreateNewForTests(env_, { test_dir_ },
+      CHECK_OK(DataDirManager::CreateNewForTests(env_, test_data_dirs,
           DataDirManagerOptions(), &dd_manager_));
     }
     BlockManagerOptions opts;
@@ -135,18 +139,30 @@ class LogBlockManagerTest : public KuduTest {
   }
 
   Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity = nullptr,
-                            FsReport* report = nullptr) {
+                            FsReport* report = nullptr,
+                            std::vector<std::string> test_data_dirs = {},
+                            bool force = false) {
+    PrepareDataDirs(&test_data_dirs);
+
     // The directory manager must outlive the block manager. Destroy the block
     // manager first to enforce this.
     bm_.reset();
 
-    // Re-open the directory manager first to clear any in-memory maps.
-    RETURN_NOT_OK(DataDirManager::OpenExistingForTests(env_, { test_dir_ },
-        DataDirManagerOptions(), &dd_manager_));
+    if (force) {
+      // Ensure the directory manager is initialized.
+      CHECK_OK(DataDirManager::CreateNewForTests(env_, test_data_dirs,
+          DataDirManagerOptions(), &dd_manager_));
+      RETURN_NOT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
+      RETURN_NOT_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &test_group_pb_));
+    } else {
+      // Re-open the directory manager first to clear any in-memory maps.
+      RETURN_NOT_OK(DataDirManager::OpenExistingForTests(env_, test_data_dirs,
+                                                         DataDirManagerOptions(), &dd_manager_));
+      RETURN_NOT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, test_group_pb_));
+    }
 
-    bm_.reset(CreateBlockManager(metric_entity));
+    bm_.reset(CreateBlockManager(metric_entity, test_data_dirs));
     RETURN_NOT_OK(bm_->Open(report));
-    RETURN_NOT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, test_group_pb_));
     return Status::OK();
   }
 
@@ -255,6 +271,16 @@ class LogBlockManagerTest : public KuduTest {
         break;
     }
   }
+  void PrepareDataDirs(std::vector<std::string>* test_data_dirs) {
+    if (test_data_dirs->empty()) {
+      *test_data_dirs = { test_dir_ };
+    }
+    for (const auto& test_data_dir : *test_data_dirs) {
+      Status s = Env::Default()->CreateDir(test_data_dir);
+      CHECK(s.IsAlreadyPresent() || s.ok())
+          << "Could not create directory " << test_data_dir << ": " << s.ToString();
+    }
+  }
 };
 
 static void CheckGaugeMetric(const scoped_refptr<MetricEntity>& entity,
@@ -262,24 +288,24 @@ static void CheckGaugeMetric(const scoped_refptr<MetricEntity>& entity,
   AtomicGauge<uint64_t>* gauge = down_cast<AtomicGauge<uint64_t>*>(
       entity->FindOrNull(*prototype).get());
   DCHECK(gauge);
-  ASSERT_EQ(expected_value, gauge->value());
+  ASSERT_EQ(expected_value, gauge->value()) << prototype->name();
 }
 
 static void CheckCounterMetric(const scoped_refptr<MetricEntity>& entity,
                                int expected_value, const MetricPrototype* prototype) {
   Counter* counter = down_cast<Counter*>(entity->FindOrNull(*prototype).get());
   DCHECK(counter);
-  ASSERT_EQ(expected_value, counter->value());
+  ASSERT_EQ(expected_value, counter->value()) << prototype->name();
 }
 
 static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
                             const vector<std::pair<int, const MetricPrototype*>> gauge_values,
                             const vector<std::pair<int, const MetricPrototype*>> counter_values) {
   for (const auto& gauge_value : gauge_values) {
-    CheckGaugeMetric(entity, gauge_value.first, gauge_value.second);
+    NO_FATALS(CheckGaugeMetric(entity, gauge_value.first, gauge_value.second));
   }
   for (const auto& counter_value: counter_values) {
-    CheckCounterMetric(entity, counter_value.first, counter_value.second);
+    NO_FATALS(CheckCounterMetric(entity, counter_value.first, counter_value.second));
   }
 }
 
@@ -971,7 +997,6 @@ TEST_F(LogBlockManagerTest, TestParseKernelRelease) {
 // times the startup of the LBM.
 //
 // This is simplistic in several ways compared to a typical workload:
-// - only one data directory (typical servers have several)
 // - minimal number of containers, each of which is entirely full
 //   (typical workloads end up writing to several containers at once
 //    due to concurrent write operations such as multiple MM threads
@@ -980,6 +1005,15 @@ TEST_F(LogBlockManagerTest, TestParseKernelRelease) {
 //
 // However it still can be used to micro-optimize the startup process.
 TEST_F(LogBlockManagerTest, StartupBenchmark) {
+  const int kTestDataDirCount = 8;
+  FLAGS_fs_target_data_dirs_per_tablet = kTestDataDirCount;
+  std::vector<std::string> test_dirs;
+  for (int i = 0; i < kTestDataDirCount; ++i) {
+    test_dirs.emplace_back(test_dir_ + "/" + std::to_string(i));
+  }
+  // Re-open block manager to place data on multiple data directories.
+  ASSERT_OK(ReopenBlockManager(nullptr, nullptr, test_dirs, /* force= */ true));
+
   // Disable preflushing since this can slow down our writes. In particular,
   // since we write such small blocks in this test, each block will likely
   // begin on the same 4KB page as the prior one we wrote, and due to the
@@ -1004,7 +1038,7 @@ TEST_F(LogBlockManagerTest, StartupBenchmark) {
   }
   for (int i = 0; i < 10; i++) {
     LOG_TIMING(INFO, "reopening block manager") {
-      ASSERT_OK(ReopenBlockManager());
+      ASSERT_OK(ReopenBlockManager(nullptr, nullptr, test_dirs));
     }
   }
 }
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index a06f319..beda716 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -1714,7 +1714,7 @@ void LogWritableBlock::DoClose() {
     container_->FinalizeBlock(block_offset_, block_length_);
   }
 
-  LogBlockRefPtr lb = container_->block_manager()->AddLogBlock(
+  LogBlockRefPtr lb = container_->block_manager()->CreateAndAddLogBlock(
       container_, block_id_, block_offset_, block_length_);
   CHECK(lb);
   container_->BlockCreated(lb);
@@ -1857,6 +1857,8 @@ size_t LogReadableBlock::memory_footprint() const {
 // LogBlockManager
 ////////////////////////////////////////////////////////////
 
+static const uint64_t kBlockMapChunk = 1 << 4;
+static const uint64_t kBlockMapMask = kBlockMapChunk - 1;
 const char* LogBlockManager::kContainerMetadataFileSuffix = ".metadata";
 const char* LogBlockManager::kContainerDataFileSuffix = ".data";
 
@@ -1880,13 +1882,18 @@ LogBlockManager::LogBlockManager(Env* env,
                                            opts_.parent_mem_tracker)),
     file_cache_("lbm", env, GetFileCacheCapacityForBlockManager(env),
                 opts_.metric_entity),
-    blocks_by_block_id_(10,
-                        BlockMap::hasher(),
-                        BlockMap::key_equal(),
-                        BlockAllocator(mem_tracker_)),
     buggy_el6_kernel_(IsBuggyEl6Kernel(env->GetKernelRelease())),
     next_block_id_(1) {
-  blocks_by_block_id_.set_deleted_key(BlockId());
+  managed_block_shards_.resize(kBlockMapChunk);
+  for (auto& mb : managed_block_shards_) {
+    mb.lock = std::unique_ptr<simple_spinlock>(new simple_spinlock());
+    mb.blocks_by_block_id
+        = std::unique_ptr<BlockMap>(new BlockMap(10,
+                                                 BlockMap::hasher(),
+                                                 BlockMap::key_equal(),
+                                                 BlockAllocator(mem_tracker_)));
+    mb.blocks_by_block_id->set_deleted_key(BlockId());
+  }
 
   // HACK: when running in a test environment, we often instantiate many
   // LogBlockManagers in the same process, eg corresponding to different
@@ -1908,14 +1915,18 @@ LogBlockManager::LogBlockManager(Env* env,
 LogBlockManager::~LogBlockManager() {
   // Release all of the memory accounted by the blocks.
   int64_t mem = 0;
-  for (const auto& entry : blocks_by_block_id_) {
-    mem += kudu_malloc_usable_size(entry.second.get());
+  for (const auto& mb : managed_block_shards_) {
+    for (const auto& entry : *mb.blocks_by_block_id) {
+      mem += kudu_malloc_usable_size(entry.second.get());
+    }
   }
   mem_tracker_->Release(mem);
 
   // A LogBlock's destructor depends on its container, so all LogBlocks must be
   // destroyed before their containers.
-  blocks_by_block_id_.clear();
+  for (auto& mb : managed_block_shards_) {
+    mb.blocks_by_block_id->clear();
+  }
 
   // Containers may have outstanding tasks running on data directories; wait
   // for them to complete before destroying the containers.
@@ -2053,8 +2064,9 @@ Status LogBlockManager::OpenBlock(const BlockId& block_id,
                                   unique_ptr<ReadableBlock>* block) {
   LogBlockRefPtr lb;
   {
-    std::lock_guard<simple_spinlock> l(lock_);
-    lb = FindPtrOrNull(blocks_by_block_id_, block_id);
+    int index = block_id.id() & kBlockMapMask;
+    std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock);
+    lb = FindPtrOrNull(*managed_block_shards_[index].blocks_by_block_id, block_id);
   }
   if (!lb) {
     return Status::NotFound("Can't find block", block_id.ToString());
@@ -2078,9 +2090,12 @@ shared_ptr<BlockDeletionTransaction> LogBlockManager::NewDeletionTransaction() {
 }
 
 Status LogBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) {
-  std::lock_guard<simple_spinlock> l(lock_);
-  block_ids->assign(open_block_ids_.begin(), open_block_ids_.end());
-  AppendKeysFromMap(blocks_by_block_id_, block_ids);
+  block_ids->clear();
+  for (const auto& mb : managed_block_shards_) {
+    std::lock_guard<simple_spinlock> l(*mb.lock);
+    AppendKeysFromMap(*mb.blocks_by_block_id, block_ids);
+    block_ids->insert(block_ids->end(), mb.open_block_ids.begin(), mb.open_block_ids.end());
+  }
   return Status::OK();
 }
 
@@ -2208,35 +2223,37 @@ bool LogBlockManager::TryUseBlockId(const BlockId& block_id) {
     return false;
   }
 
-  std::lock_guard<simple_spinlock> l(lock_);
-  if (ContainsKey(blocks_by_block_id_, block_id)) {
+  int index = block_id.id() & kBlockMapMask;
+  std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock);
+  if (ContainsKey(*managed_block_shards_[index].blocks_by_block_id, block_id)) {
     return false;
   }
-  return InsertIfNotPresent(&open_block_ids_, block_id);
+
+  return InsertIfNotPresent(&managed_block_shards_[index].open_block_ids, block_id);
 }
 
-LogBlockRefPtr LogBlockManager::AddLogBlock(
+LogBlockRefPtr LogBlockManager::CreateAndAddLogBlock(
     LogBlockContainerRefPtr container,
     const BlockId& block_id,
     int64_t offset,
     int64_t length) {
-  std::lock_guard<simple_spinlock> l(lock_);
   LogBlockRefPtr lb(new LogBlock(std::move(container), block_id, offset, length));
   mem_tracker_->Consume(kudu_malloc_usable_size(lb.get()));
 
-  if (AddLogBlockUnlocked(lb)) {
+  if (AddLogBlock(lb)) {
     return lb;
   }
   return nullptr;
 }
 
-bool LogBlockManager::AddLogBlockUnlocked(LogBlockRefPtr lb) {
-  DCHECK(lock_.is_locked());
-
+bool LogBlockManager::AddLogBlock(LogBlockRefPtr lb) {
   // InsertIfNotPresent doesn't use move semantics, so instead we just
   // insert an empty scoped_refptr and assign into it down below rather
   // than using the utility function.
-  LogBlockRefPtr* entry_ptr = &blocks_by_block_id_[lb->block_id()];
+  int index = lb->block_id().id() & kBlockMapMask;
+  std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock);
+  auto& blocks_by_block_id = *managed_block_shards_[index].blocks_by_block_id;
+  LogBlockRefPtr* entry_ptr = &blocks_by_block_id[lb->block_id()];
   if (*entry_ptr) {
     // Already have an entry for this block ID.
     return false;
@@ -2245,9 +2262,9 @@ bool LogBlockManager::AddLogBlockUnlocked(LogBlockRefPtr lb) {
   VLOG(2) << Substitute("Added block: id $0, offset $1, length $2",
                         lb->block_id().ToString(), lb->offset(), lb->length());
 
-  // There may already be an entry in open_block_ids_ (e.g. we just finished
+  // There may already be an entry in open_block_ids_arr_ (e.g. we just finished
   // writing out a block).
-  open_block_ids_.erase(lb->block_id());
+  managed_block_shards_[index].open_block_ids.erase(lb->block_id());
   if (metrics()) {
     metrics()->blocks_under_management->Increment();
     metrics()->bytes_under_management->IncrementBy(lb->length());
@@ -2263,21 +2280,18 @@ Status LogBlockManager::RemoveLogBlocks(vector<BlockId> block_ids,
   Status first_failure;
   vector<LogBlockRefPtr> lbs;
   int64_t malloc_space = 0, blocks_length = 0;
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    for (const auto& block_id : block_ids) {
-      LogBlockRefPtr lb;
-      Status s = RemoveLogBlockUnlocked(block_id, &lb);
-      // If we get NotFound, then the block was already deleted.
-      if (!s.ok() && !s.IsNotFound()) {
-        if (first_failure.ok()) first_failure = s;
-      } else if (s.ok()) {
-        malloc_space += kudu_malloc_usable_size(lb.get());
-        blocks_length += lb->length();
-        lbs.emplace_back(std::move(lb));
-      } else {
-        deleted->emplace_back(block_id);
-      }
+  for (const auto& block_id : block_ids) {
+    LogBlockRefPtr lb;
+    Status s = RemoveLogBlock(block_id, &lb);
+    // If we get NotFound, then the block was already deleted.
+    if (!s.ok() && !s.IsNotFound()) {
+      if (first_failure.ok()) first_failure = s;
+    } else if (s.ok()) {
+      malloc_space += kudu_malloc_usable_size(lb.get());
+      blocks_length += lb->length();
+      lbs.emplace_back(std::move(lb));
+    } else {
+      deleted->emplace_back(block_id);
     }
   }
 
@@ -2322,10 +2336,14 @@ Status LogBlockManager::RemoveLogBlocks(vector<BlockId> block_ids,
   return first_failure;
 }
 
-Status LogBlockManager::RemoveLogBlockUnlocked(const BlockId& block_id,
-                                               LogBlockRefPtr* lb) {
-  auto it = blocks_by_block_id_.find(block_id);
-  if (it == blocks_by_block_id_.end()) {
+Status LogBlockManager::RemoveLogBlock(const BlockId& block_id,
+                                       LogBlockRefPtr* lb) {
+  int index = block_id.id() & kBlockMapMask;
+  std::lock_guard<simple_spinlock> l(*managed_block_shards_[index].lock);
+  auto& blocks_by_block_id = managed_block_shards_[index].blocks_by_block_id;
+
+  auto it = blocks_by_block_id->find(block_id);
+  if (it == blocks_by_block_id->end()) {
     return Status::NotFound("Can't find block", block_id.ToString());
   }
 
@@ -2345,7 +2363,7 @@ Status LogBlockManager::RemoveLogBlockUnlocked(const BlockId& block_id,
     }
   }
   *lb = std::move(it->second);
-  blocks_by_block_id_.erase(it);
+  blocks_by_block_id->erase(it);
 
   VLOG(2) << Substitute("Removed block: id $0, offset $1, length $2",
                         (*lb)->block_id().ToString(), (*lb)->offset(), (*lb)->length());
@@ -2582,29 +2600,23 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
 
     next_block_id_.StoreMax(max_block_id + 1);
 
-    // Under the lock, merge this map into the main block map and add
-    // the container.
-    {
-      std::lock_guard<simple_spinlock> l(lock_);
-      // To avoid cacheline contention during startup, we aggregate all of the
-      // memory in a local and add it to the mem-tracker in a single increment
-      // at the end of this loop.
-      int64_t mem_usage = 0;
-      for (UntrackedBlockMap::value_type& e : live_blocks) {
-        int block_mem = kudu_malloc_usable_size(e.second.get());
-        if (!AddLogBlockUnlocked(std::move(e.second))) {
-          // TODO(adar): track as an inconsistency?
-          LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
-                     << " which already is alive from another container when "
-                     << " processing container " << container->ToString();
-        }
-        mem_usage += block_mem;
+    int64_t mem_usage = 0;
+    for (UntrackedBlockMap::value_type& e : live_blocks) {
+      int block_mem = kudu_malloc_usable_size(e.second.get());
+      if (!AddLogBlock(std::move(e.second))) {
+        // TODO(adar): track as an inconsistency?
+        LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
+                   << " which already is alive from another container when "
+                   << " processing container " << container->ToString();
       }
-
-      mem_tracker_->Consume(mem_usage);
-      AddNewContainerUnlocked(container);
-      MakeContainerAvailableUnlocked(std::move(container));
+      mem_usage += block_mem;
     }
+
+    mem_tracker_->Consume(mem_usage);
+
+    std::lock_guard<simple_spinlock> l(lock_);
+    AddNewContainerUnlocked(container);
+    MakeContainerAvailableUnlocked(std::move(container));
   }
 
   // Like the rest of Open(), repairs are performed per data directory to take
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index f6b1fbd..11c1216 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -59,7 +59,6 @@ class LogBlock;
 class LogBlockContainer;
 class LogBlockDeletionTransaction;
 class LogWritableBlock;
-
 struct LogBlockManagerMetrics;
 } // namespace internal
 
@@ -296,21 +295,20 @@ class LogBlockManager : public BlockManager {
   // use), false otherwise.
   bool TryUseBlockId(const BlockId& block_id);
 
-  // Adds a LogBlock to in-memory data structures.
+  // Creates and adds a LogBlock to in-memory data structures.
   //
   // Returns the created LogBlock if it was successfully added or nullptr if a
   // block with that ID was already present.
-  LogBlockRefPtr AddLogBlock(
+  LogBlockRefPtr CreateAndAddLogBlock(
       LogBlockContainerRefPtr container,
       const BlockId& block_id,
       int64_t offset,
       int64_t length);
 
-  // Unlocked variant of AddLogBlock() for an already-constructed LogBlock object.
-  // Must hold 'lock_'.
+  // Adds a LogBlock for an already-constructed LogBlock object.
   //
   // Returns true if the LogBlock was successfully added, false if it was already present.
-  bool AddLogBlockUnlocked(LogBlockRefPtr lb);
+  bool AddLogBlock(LogBlockRefPtr lb);
 
   // Removes the given set of LogBlocks from in-memory data structures, and
   // appends the block deletion metadata to record the on-disk deletion.
@@ -323,12 +321,12 @@ class LogBlockManager : public BlockManager {
                          std::vector<LogBlockRefPtr>* log_blocks,
                          std::vector<BlockId>* deleted);
 
-  // Removes a LogBlock from in-memory data structures. Must hold 'lock_'.
+  // Removes a LogBlock from in-memory data structures.
   // The 'lb' out parameter will be set with the successfully deleted LogBlock.
   //
   // Returns an error of LogBlock cannot be successfully removed.
-  Status RemoveLogBlockUnlocked(const BlockId& block_id,
-                                LogBlockRefPtr* lb);
+  Status RemoveLogBlock(const BlockId& block_id,
+                        LogBlockRefPtr* lb);
 
   // Repairs any inconsistencies for 'dir' described in 'report'.
   //
@@ -408,7 +406,25 @@ class LogBlockManager : public BlockManager {
   // interesting (e.g. LogBlocks).
   std::shared_ptr<MemTracker> mem_tracker_;
 
-  // Protects the block map, container structures, and 'dirty_dirs'.
+  // Block IDs container used to prevent collisions when creating new anonymous blocks.
+  struct ManagedBlockShard {
+    // Protects 'blocks_by_block_id' and 'open_block_ids'.
+    std::unique_ptr<simple_spinlock> lock;
+
+    // Maps block IDs to blocks that are now readable, either because they
+    // already existed on disk when the block manager was opened, or because
+    // they're WritableBlocks that were closed.
+    std::unique_ptr<BlockMap> blocks_by_block_id;
+
+    // Contains block IDs for WritableBlocks that are still open for writing.
+    // When a WritableBlock is closed, its ID is moved to 'blocks_by_block_id'.
+    BlockIdSet open_block_ids;
+  };
+
+  // Sharding block IDs containers.
+  std::vector<ManagedBlockShard> managed_block_shards_;
+
+  // Protects 'all_containers_by_name_', 'available_containers_by_data_dir_' and 'dirty_dirs'.
   mutable simple_spinlock lock_;
 
   // Maps a data directory to an upper bound on the number of blocks that a
@@ -419,18 +435,6 @@ class LogBlockManager : public BlockManager {
   // Manages files opened for reading.
   FileCache<RWFile> file_cache_;
 
-  // Maps block IDs to blocks that are now readable, either because they
-  // already existed on disk when the block manager was opened, or because
-  // they're WritableBlocks that were closed.
-  BlockMap blocks_by_block_id_;
-
-  // Contains block IDs for WritableBlocks that are still open for writing.
-  // When a WritableBlock is closed, its ID is moved to blocks_by_block_id.
-  //
-  // Together with blocks_by_block_id's keys, used to prevent collisions
-  // when creating new anonymous blocks.
-  BlockIdSet open_block_ids_;
-
   // Holds (and owns) all containers loaded from disk.
   std::unordered_map<std::string,
                      LogBlockContainerRefPtr> all_containers_by_name_;