You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/12/09 04:16:05 UTC

[kudu] 05/05: KUDU-3001 Multi-thread to load containers in a data directory

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 6b6910870ce2c35bf8b9be9408f44a8cec6b580a
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Fri Nov 15 19:47:57 2019 +0800

    KUDU-3001 Multi-thread to load containers in a data directory
    
    When a data directory has many block containers, a single thread to
    load these container files is low efficiency, we can improve it by
    multi-threads.
    
    We did some simple benchmarks to verify it. Adjust
    'log_container_max_size' to 1GB to generate more containers when do
    benchmarks, adjust 'startup_benchmark_data_dir_count_for_testing' to 8
    to make sure existing concurrent data directories load are effective,
    and adjust 'fs_max_thread_count_per_data_dir' and
    'startup_benchmark_block_count_for_testing' to different
    values, timing 10 times ReopenBlockManager(), in milliseconds,
    result details as follow:
    
    disk type: SSD
                             |                          new version
    Block count  old version | 1 thread | 2 threads | 4 threads | 8 threads | 16 threads | 32 threads
        100,000        2,375      2,382       2,342       2,372       2,343        2,353        2,393
      1,000,000       24,018     23,813      22,628      22,407      22,367       22,636       23,173
      2,000,000       50,163     51,120      39,726      37,589      37,671       37,501       37,710
      4,000,000      104,051    105,560      90,427      79,778      73,129       73,205       74,947
      8,000,000      214,347    216,210     199,456     159,143     157,190      158,798      157,056
    
    disk type: spinning disk
                             |                          new version
    Block count  old version | 1 thread | 2 threads | 4 threads | 8 threads | 16 threads | 32 threads
        100,000        3,207      3,347       3,345       3,279       3,237        3,263        3,221
      1,000,000       33,659     34,106      32,081      30,261      30,142       30,115       30,876
      2,000,000       68,097     74,939      56,976      51,407      50,957       56,299       58,456
      4,000,000      146,503    162,389     116,956     104,435      94,905      102,606      100,526
      8,000,000      331,201    349,609     267,259     247,069     243,064      247,810      247,472
    
    Change-Id: I0721ee4a5a6824db146ba0658e60eec25dd0c65c
    Reviewed-on: http://gerrit.cloudera.org:8080/14743
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/fs/data_dirs.cc              |   6 +-
 src/kudu/fs/log_block_manager-test.cc |  10 +-
 src/kudu/fs/log_block_manager.cc      | 516 +++++++++++++++++++---------------
 src/kudu/fs/log_block_manager.h       |  15 +-
 4 files changed, 318 insertions(+), 229 deletions(-)

diff --git a/src/kudu/fs/data_dirs.cc b/src/kudu/fs/data_dirs.cc
index b7df6ed..ddeb55f 100644
--- a/src/kudu/fs/data_dirs.cc
+++ b/src/kudu/fs/data_dirs.cc
@@ -101,6 +101,10 @@ DEFINE_bool(fs_data_dirs_consider_available_space, true,
 TAG_FLAG(fs_data_dirs_consider_available_space, runtime);
 TAG_FLAG(fs_data_dirs_consider_available_space, evolving);
 
+DEFINE_uint64(fs_max_thread_count_per_data_dir, 8,
+              "Maximum work thread per data directory.");
+TAG_FLAG(fs_max_thread_count_per_data_dir, advanced);
+
 METRIC_DEFINE_gauge_uint64(server, data_dirs_failed,
                            "Data Directories Failed",
                            kudu::MetricUnit::kDataDirectories,
@@ -788,7 +792,7 @@ Status DataDirManager::Open() {
     // Create a per-dir thread pool.
     unique_ptr<ThreadPool> pool;
     RETURN_NOT_OK(ThreadPoolBuilder(Substitute("data dir $0", i))
-                  .set_max_threads(1)
+                  .set_max_threads(FLAGS_fs_max_thread_count_per_data_dir)
                   .set_trace_metric_prefix("data dirs")
                   .Build(&pool));
     unique_ptr<DataDir> dd(new DataDir(
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 01952c8..b1eec55 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -86,6 +86,10 @@ DECLARE_string(block_manager_preflush_control);
 DECLARE_string(env_inject_eio_globs);
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
+DEFINE_int32(startup_benchmark_block_count_for_testing, 1000000,
+             "Block count to do startup benchmark.");
+DEFINE_int32(startup_benchmark_data_dir_count_for_testing, 8,
+             "Data directories to do startup benchmark.");
 
 // Block manager metrics.
 METRIC_DECLARE_counter(block_manager_total_blocks_deleted);
@@ -1005,10 +1009,8 @@ TEST_F(LogBlockManagerTest, TestParseKernelRelease) {
 //
 // However it still can be used to micro-optimize the startup process.
 TEST_F(LogBlockManagerTest, StartupBenchmark) {
-  const int kTestDataDirCount = 8;
-  FLAGS_fs_target_data_dirs_per_tablet = kTestDataDirCount;
   std::vector<std::string> test_dirs;
-  for (int i = 0; i < kTestDataDirCount; ++i) {
+  for (int i = 0; i < FLAGS_startup_benchmark_data_dir_count_for_testing; ++i) {
     test_dirs.emplace_back(test_dir_ + "/" + std::to_string(i));
   }
   // Re-open block manager to place data on multiple data directories.
@@ -1023,7 +1025,7 @@ TEST_F(LogBlockManagerTest, StartupBenchmark) {
   // See http://yoshinorimatsunobu.blogspot.com/2014/03/how-syncfilerange-really-works.html
   // for details.
   FLAGS_block_manager_preflush_control = "never";
-  const int kNumBlocks = AllowSlowTests() ? 1000000 : 1000;
+  const int kNumBlocks = AllowSlowTests() ? FLAGS_startup_benchmark_block_count_for_testing : 1000;
   // Creates 'kNumBlocks' blocks with minimal data.
   {
     unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction();
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 3789c38..af182f7 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -20,6 +20,7 @@
 #include <errno.h>
 
 #include <algorithm>
+#include <cerrno>
 #include <cstddef>
 #include <cstdint>
 #include <map>
@@ -63,7 +64,6 @@
 #include "kudu/util/locks.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/metrics.h"
-#include "kudu/util/monotime.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/random.h"
@@ -1500,6 +1500,37 @@ void LogBlockDeletionTransaction::AddBlock(const LogBlockRefPtr& lb) {
 }
 
 ////////////////////////////////////////////////////////////
+// LogBlockContainerLoadResult
+////////////////////////////////////////////////////////////
+
+struct LogBlockContainerLoadResult {
+  Status status;
+  FsReport report;
+
+  // Keep track of containers that have nothing but dead blocks; they will be
+  // deleted during repair.
+  vector<LogBlockContainerRefPtr> dead_containers;
+  // Keep track of containers whose live block ratio is low; their metadata
+  // files will be compacted during repair.
+  unordered_map<string, vector<BlockRecordPB>> low_live_block_containers;
+  // Keep track of deleted blocks whose space hasn't been punched; they will
+  // be repunched during repair.
+  vector<LogBlockRefPtr> need_repunching_blocks;
+
+  LogBlockContainerLoadResult() {
+    // We are going to perform these checks.
+    //
+    // Note: this isn't necessarily the complete set of FsReport checks; there
+    // may be checks that the LBM cannot perform.
+    report.full_container_space_check.emplace();
+    report.incomplete_container_check.emplace();
+    report.malformed_record_check.emplace();
+    report.misaligned_block_check.emplace();
+    report.partial_record_check.emplace();
+  }
+};
+
+////////////////////////////////////////////////////////////
 // LogBlock (definition)
 ////////////////////////////////////////////////////////////
 
@@ -1939,6 +1970,21 @@ LogBlockManager::~LogBlockManager() {
   dd_manager_->WaitOnClosures();
 }
 
+// Ensure that no load task failed without being handled.
+//
+// Currently only disk failures are handled. Reports from failed disks are
+// unusable.
+#define RETURN_ON_NON_DISK_FAILURE(d, s)                        \
+  do {                                                          \
+    if (PREDICT_FALSE(!(s).ok())) {                             \
+      if (!(s).IsDiskFailure()) {                               \
+        return s;                                               \
+      }                                                         \
+      LOG(ERROR) << Substitute("Not using report from $0: $1",  \
+          (d)->dir(), (s).ToString());                          \
+    }                                                           \
+  } while (false)
+
 Status LogBlockManager::Open(FsReport* report) {
   RETURN_NOT_OK(file_cache_.Init());
 
@@ -1981,8 +2027,10 @@ Status LogBlockManager::Open(FsReport* report) {
     InsertOrDie(&block_limits_by_data_dir_, dd.get(), limit);
   }
 
-  vector<FsReport> reports(dd_manager_->data_dirs().size());
+  // Open containers in each data dirs.
   vector<Status> statuses(dd_manager_->data_dirs().size());
+  vector<vector<unique_ptr<internal::LogBlockContainerLoadResult>>> container_results(
+      dd_manager_->data_dirs().size());
   int i = -1;
   for (const auto& dd : dd_manager_->data_dirs()) {
     i++;
@@ -1994,12 +2042,13 @@ Status LogBlockManager::Open(FsReport* report) {
       statuses[i] = Status::IOError("Data directory failed", "", EIO);
       continue;
     }
+
     // Open the data dir asynchronously.
     dd->ExecClosure(
         Bind(&LogBlockManager::OpenDataDir,
              Unretained(this),
              dd.get(),
-             &reports[i],
+             &container_results[i],
              &statuses[i]));
   }
 
@@ -2007,26 +2056,73 @@ Status LogBlockManager::Open(FsReport* report) {
   for (const auto& dd : dd_manager_->data_dirs()) {
     dd->WaitOnClosures();
   }
-  if (dd_manager_->GetFailedDataDirs().size() == dd_manager_->data_dirs().size()) {
-    return Status::IOError("All data dirs failed to open", "", EIO);
+
+  // Check load errors and merge each data dir's container load results, then do repair tasks.
+  vector<unique_ptr<internal::LogBlockContainerLoadResult>> dir_results(
+      dd_manager_->data_dirs().size());
+  for (int i = 0; i < dd_manager_->data_dirs().size(); ++i) {
+    const auto& s = statuses[i];
+    const auto& dd = dd_manager_->data_dirs()[i];
+    RETURN_ON_NON_DISK_FAILURE(dd, s);
+    // If open dir error, do not try to repair.
+    if (PREDICT_FALSE(!s.ok())) {
+      continue;
+    }
+
+    unique_ptr<internal::LogBlockContainerLoadResult> dir_result(
+        new internal::LogBlockContainerLoadResult());
+    dir_result->report.data_dirs.push_back(dd->dir());
+    bool do_repair = true;
+    for (const auto& container_result : container_results[i]) {
+      RETURN_ON_NON_DISK_FAILURE(dd, container_result->status);
+      if (PREDICT_FALSE(!s.ok())) {
+        // If open container error, do not try to repair.
+        do_repair = false;
+        break;
+      }
+
+      dir_result->report.MergeFrom(container_result->report);
+      dir_result->dead_containers.insert(
+          dir_result->dead_containers.end(),
+          container_result->dead_containers.begin(),
+          container_result->dead_containers.end());
+      container_result->dead_containers.clear();
+      dir_result->low_live_block_containers.insert(
+          container_result->low_live_block_containers.begin(),
+          container_result->low_live_block_containers.end());
+      container_result->low_live_block_containers.clear();
+      dir_result->need_repunching_blocks.insert(
+          dir_result->need_repunching_blocks.end(),
+          container_result->need_repunching_blocks.begin(),
+          container_result->need_repunching_blocks.end());
+      container_result->need_repunching_blocks.clear();
+    }
+    if (do_repair) {
+      dir_results[i] = std::move(dir_result);
+      dd->ExecClosure(Bind(&LogBlockManager::RepairTask, Unretained(this),
+                           dd.get(), Unretained(dir_results[i].get())));
+    }
+  }
+
+  // Wait for the repair tasks to complete.
+  for (const auto& dd : dd_manager_->data_dirs()) {
+    dd->WaitOnClosures();
   }
 
-  // Ensure that no open failed without being handled.
-  //
-  // Currently only disk failures are handled. Reports from failed disks are
-  // unusable.
   FsReport merged_report;
-  for (i = 0; i < statuses.size(); i++) {
-    const Status& s = statuses[i];
-    if (PREDICT_TRUE(s.ok())) {
-      merged_report.MergeFrom(reports[i]);
+  for (int i = 0; i < dd_manager_->data_dirs().size(); ++i) {
+    if (PREDICT_FALSE(!dir_results[i])) {
       continue;
     }
-    if (!s.IsDiskFailure()) {
-      return s;
+    if (PREDICT_TRUE(dir_results[i]->status.ok())) {
+      merged_report.MergeFrom(dir_results[i]->report);
+      continue;
     }
-    LOG(ERROR) << Substitute("Not using report from $0: $1",
-        dd_manager_->data_dirs()[i]->dir(), s.ToString());
+    RETURN_ON_NON_DISK_FAILURE(dd_manager_->data_dirs()[i], dir_results[i]->status);
+  }
+
+  if (dd_manager_->GetFailedDataDirs().size() == dd_manager_->data_dirs().size()) {
+    return Status::IOError("All data dirs failed to open", "", EIO);
   }
 
   // Either return or log the report.
@@ -2376,34 +2472,10 @@ Status LogBlockManager::RemoveLogBlock(const BlockId& block_id,
   return Status::OK();
 }
 
-void LogBlockManager::OpenDataDir(DataDir* dir,
-                                  FsReport* report,
-                                  Status* result_status) {
-  FsReport local_report;
-  local_report.data_dirs.push_back(dir->dir());
-
-  // We are going to perform these checks.
-  //
-  // Note: this isn't necessarily the complete set of FsReport checks; there
-  // may be checks that the LBM cannot perform.
-  local_report.full_container_space_check.emplace();
-  local_report.incomplete_container_check.emplace();
-  local_report.malformed_record_check.emplace();
-  local_report.misaligned_block_check.emplace();
-  local_report.partial_record_check.emplace();
-
-  // Keep track of deleted blocks whose space hasn't been punched; they will
-  // be repunched during repair.
-  vector<LogBlockRefPtr> need_repunching;
-
-  // Keep track of containers that have nothing but dead blocks; they will be
-  // deleted during repair.
-  vector<LogBlockContainerRefPtr> dead_containers;
-
-  // Keep track of containers whose live block ratio is low; their metadata
-  // files will be compacted during repair.
-  unordered_map<string, vector<BlockRecordPB>> low_live_block_containers;
-
+void LogBlockManager::OpenDataDir(
+    DataDir* dir,
+    vector<unique_ptr<internal::LogBlockContainerLoadResult>>* results,
+    Status* result_status) {
   // Find all containers and open them.
   unordered_set<string> containers_seen;
   vector<string> children;
@@ -2415,7 +2487,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
         "Could not list children of $0", dir->dir()));
     return;
   }
-  MonoTime last_opened_container_log_time = MonoTime::Now();
+
   for (const string& child : children) {
     string container_name;
     if (!TryStripSuffixString(
@@ -2428,14 +2500,16 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       continue;
     }
 
+    // Add a new result for the container.
+    results->emplace_back(new internal::LogBlockContainerLoadResult());
     LogBlockContainerRefPtr container;
     s = LogBlockContainer::Open(
-        this, dir, &local_report, container_name, &container);
-    if (s.IsAborted()) {
-      // Skip the container. Open() added a record of it to 'local_report' for us.
-      continue;
-    }
+        this, dir, &results->back()->report, container_name, &container);
     if (!s.ok()) {
+      if (s.IsAborted()) {
+        // Skip the container. Open() added a record of it to 'results->back()->report' for us.
+        continue;
+      }
       if (opts_.read_only && s.IsNotFound()) {
         // Skip the container while the operation is read-only and the files are away,
         // especially for the kudu cli tool.
@@ -2446,201 +2520,201 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       return;
     }
 
-    // Process the records, building a container-local map for live blocks and
-    // a list of dead blocks.
-    //
-    // It's important that we don't try to add these blocks to the global map
-    // incrementally as we see each record, since it's possible that one container
-    // has a "CREATE <b>" while another has a "CREATE <b> ; DELETE <b>" pair.
-    // If we processed those two containers in this order, then upon processing
-    // the second container, we'd think there was a duplicate block. Building
-    // the container-local map first ensures that we discount deleted blocks
-    // before checking for duplicate IDs.
-    //
-    // NOTE: Since KUDU-1538, we allocate sequential block IDs, which makes reuse
-    // exceedingly unlikely. However, we might have old data which still exhibits
-    // the above issue.
-    UntrackedBlockMap live_blocks;
-    BlockRecordMap live_block_records;
-    vector<LogBlockRefPtr> dead_blocks;
-    uint64_t max_block_id = 0;
-    s = container->ProcessRecords(&local_report,
-                                  &live_blocks,
-                                  &live_block_records,
-                                  &dead_blocks,
-                                  &max_block_id);
-    if (!s.ok()) {
-      *result_status = s.CloneAndPrepend(Substitute(
-          "Could not process records in container $0", container->ToString()));
-      return;
+    // Load the container's records asynchronously.
+    dir->ExecClosure(Bind(&LogBlockManager::LoadContainer, Unretained(this),
+                          dir, container, Unretained(results->back().get())));
+  }
+}
+
+void LogBlockManager::LoadContainer(DataDir* dir,
+                                    LogBlockContainerRefPtr container,
+                                    internal::LogBlockContainerLoadResult* result) {
+  // Process the records, building a container-local map for live blocks and
+  // a list of dead blocks.
+  //
+  // It's important that we don't try to add these blocks to the global map
+  // incrementally as we see each record, since it's possible that one container
+  // has a "CREATE <b>" while another has a "CREATE <b> ; DELETE <b>" pair.
+  // If we processed those two containers in this order, then upon processing
+  // the second container, we'd think there was a duplicate block. Building
+  // the container-local map first ensures that we discount deleted blocks
+  // before checking for duplicate IDs.
+  //
+  // NOTE: Since KUDU-1538, we allocate sequential block IDs, which makes reuse
+  // exceedingly unlikely. However, we might have old data which still exhibits
+  // the above issue.
+  UntrackedBlockMap live_blocks;
+  BlockRecordMap live_block_records;
+  vector<LogBlockRefPtr> dead_blocks;
+  uint64_t max_block_id = 0;
+  Status s = container->ProcessRecords(&result->report,
+                                       &live_blocks,
+                                       &live_block_records,
+                                       &dead_blocks,
+                                       &max_block_id);
+  if (!s.ok()) {
+    result->status = s.CloneAndPrepend(Substitute(
+        "Could not process records in container $0", container->ToString()));
+    return;
+  }
+
+  // With deleted blocks out of the way, check for misaligned blocks.
+  //
+  // We could also enforce that the record's offset is aligned with the
+  // underlying filesystem's block size, an invariant maintained by the log
+  // block manager. However, due to KUDU-1793, that invariant may have been
+  // broken, so we'll note but otherwise allow it.
+  for (const auto& e : live_blocks) {
+    if (PREDICT_FALSE(e.second->offset() %
+                      container->instance()->filesystem_block_size_bytes() != 0)) {
+      result->report.misaligned_block_check->entries.emplace_back(
+          container->ToString(), e.first);
+
     }
+  }
 
-    // With deleted blocks out of the way, check for misaligned blocks.
+  if (container->full()) {
+    // Full containers without any live blocks can be deleted outright.
     //
-    // We could also enforce that the record's offset is aligned with the
-    // underlying filesystem's block size, an invariant maintained by the log
-    // block manager. However, due to KUDU-1793, that invariant may have been
-    // broken, so we'll note but otherwise allow it.
-    for (const auto& e : live_blocks) {
-      if (PREDICT_FALSE(e.second->offset() %
-                        container->instance()->filesystem_block_size_bytes() != 0)) {
-        local_report.misaligned_block_check->entries.emplace_back(
-            container->ToString(), e.first);
-
+    // TODO(adar): this should be reported as an inconsistency once dead
+    // container deletion is also done in real time. Until then, it would be
+    // confusing to report it as such since it'll be a natural event at startup.
+    if (container->live_blocks() == 0) {
+      DCHECK(live_blocks.empty());
+      result->dead_containers.emplace_back(container);
+    } else if (static_cast<double>(container->live_blocks()) /
+        container->total_blocks() <= FLAGS_log_container_live_metadata_before_compact_ratio) {
+      // Metadata files of containers with very few live blocks will be compacted.
+      //
+      // TODO(adar): this should be reported as an inconsistency once
+      // container metadata compaction is also done in realtime. Until then,
+      // it would be confusing to report it as such since it'll be a natural
+      // event at startup.
+      vector<BlockRecordPB> records(live_block_records.size());
+      int i = 0;
+      for (auto& e : live_block_records) {
+        records[i].Swap(&e.second);
+        i++;
       }
-    }
 
-    if (container->full()) {
-      // Full containers without any live blocks can be deleted outright.
+      // Sort the records such that their ordering reflects the ordering in
+      // the pre-compacted metadata file.
       //
-      // TODO(adar): this should be reported as an inconsistency once dead
-      // container deletion is also done in real time. Until then, it would be
-      // confusing to report it as such since it'll be a natural event at startup.
-      if (container->live_blocks() == 0) {
-        DCHECK(live_blocks.empty());
-        dead_containers.emplace_back(container);
-      } else if (static_cast<double>(container->live_blocks()) /
-          container->total_blocks() <= FLAGS_log_container_live_metadata_before_compact_ratio) {
-        // Metadata files of containers with very few live blocks will be compacted.
-        //
-        // TODO(adar): this should be reported as an inconsistency once
-        // container metadata compaction is also done in realtime. Until then,
-        // it would be confusing to report it as such since it'll be a natural
-        // event at startup.
-        vector<BlockRecordPB> records(live_block_records.size());
-        int i = 0;
-        for (auto& e : live_block_records) {
-          records[i].Swap(&e.second);
-          i++;
+      // This is preferred to storing the records in an order-preserving
+      // container (such as std::map) because while records are temporarily
+      // retained for every container, only some containers will actually
+      // undergo metadata compaction.
+      std::sort(records.begin(), records.end(),
+                [](const BlockRecordPB& a, const BlockRecordPB& b) {
+        // Sort by timestamp.
+        if (a.timestamp_us() != b.timestamp_us()) {
+          return a.timestamp_us() < b.timestamp_us();
         }
 
-        // Sort the records such that their ordering reflects the ordering in
-        // the pre-compacted metadata file.
+        // If the timestamps match, sort by offset.
         //
-        // This is preferred to storing the records in an order-preserving
-        // container (such as std::map) because while records are temporarily
-        // retained for every container, only some containers will actually
-        // undergo metadata compaction.
-        std::sort(records.begin(), records.end(),
-                  [](const BlockRecordPB& a, const BlockRecordPB& b) {
-          // Sort by timestamp.
-          if (a.timestamp_us() != b.timestamp_us()) {
-            return a.timestamp_us() < b.timestamp_us();
-          }
-
-          // If the timestamps match, sort by offset.
-          //
-          // If the offsets also match (i.e. both blocks are of zero length),
-          // it doesn't matter which of the two records comes first.
-          return a.offset() < b.offset();
-        });
-
-        low_live_block_containers[container->ToString()] = std::move(records);
-      }
+        // If the offsets also match (i.e. both blocks are of zero length),
+        // it doesn't matter which of the two records comes first.
+        return a.offset() < b.offset();
+      });
 
-      // Having processed the block records, let's check whether any full
-      // containers have any extra space (left behind after a crash or from an
-      // older version of Kudu).
-      //
-      // Filesystems are unpredictable beasts and may misreport the amount of
-      // space allocated to a file in various interesting ways. Some examples:
-      // - XFS's speculative preallocation feature may artificially enlarge the
-      //   container's data file without updating its file size. This makes the
-      //   file size untrustworthy for the purposes of measuring allocated space.
-      //   See KUDU-1856 for more details.
-      // - On el6.6/ext4 a container data file that consumed ~32K according to
-      //   its extent tree was actually reported as consuming an additional fs
-      //   block (2k) of disk space. A similar container data file (generated
-      //   via the same workload) on Ubuntu 16.04/ext4 did not exhibit this.
-      //   The suspicion is that older versions of ext4 include interior nodes
-      //   of the extent tree when reporting file block usage.
-      //
-      // To deal with these issues, our extra space cleanup code (deleted block
-      // repunching and container truncation) is gated on an "actual disk space
-      // consumed" heuristic. To prevent unnecessary triggering of the
-      // heuristic, we allow for some slop in our size measurements. The exact
-      // amount of slop is configurable via
-      // log_container_excess_space_before_cleanup_fraction.
-      //
-      // Too little slop and we'll do unnecessary work at startup. Too much and
-      // more unused space may go unreclaimed.
-      string data_filename = StrCat(container->ToString(), kContainerDataFileSuffix);
-      uint64_t reported_size;
-      s = env_->GetFileSizeOnDisk(data_filename, &reported_size);
-      if (!s.ok()) {
-        HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
-            ErrorHandlerType::DISK_ERROR, dir));
-        *result_status = s.CloneAndPrepend(Substitute(
-            "Could not get on-disk file size of container $0", container->ToString()));
-        return;
-      }
-      int64_t cleanup_threshold_size = container->live_bytes_aligned() *
-          (1 + FLAGS_log_container_excess_space_before_cleanup_fraction);
-      if (reported_size > cleanup_threshold_size) {
-        local_report.full_container_space_check->entries.emplace_back(
-            container->ToString(), reported_size - container->live_bytes_aligned());
-
-        // If the container is to be deleted outright, don't bother repunching
-        // its blocks. The report entry remains, however, so it's clear that
-        // there was a space discrepancy.
-        if (container->live_blocks()) {
-          need_repunching.insert(need_repunching.end(),
-                                 dead_blocks.begin(), dead_blocks.end());
-        }
-      }
+      result->low_live_block_containers[container->ToString()] = std::move(records);
+    }
 
-      local_report.stats.lbm_full_container_count++;
+    // Having processed the block records, let's check whether any full
+    // containers have any extra space (left behind after a crash or from an
+    // older version of Kudu).
+    //
+    // Filesystems are unpredictable beasts and may misreport the amount of
+    // space allocated to a file in various interesting ways. Some examples:
+    // - XFS's speculative preallocation feature may artificially enlarge the
+    //   container's data file without updating its file size. This makes the
+    //   file size untrustworthy for the purposes of measuring allocated space.
+    //   See KUDU-1856 for more details.
+    // - On el6.6/ext4 a container data file that consumed ~32K according to
+    //   its extent tree was actually reported as consuming an additional fs
+    //   block (2k) of disk space. A similar container data file (generated
+    //   via the same workload) on Ubuntu 16.04/ext4 did not exhibit this.
+    //   The suspicion is that older versions of ext4 include interior nodes
+    //   of the extent tree when reporting file block usage.
+    //
+    // To deal with these issues, our extra space cleanup code (deleted block
+    // repunching and container truncation) is gated on an "actual disk space
+    // consumed" heuristic. To prevent unnecessary triggering of the
+    // heuristic, we allow for some slop in our size measurements. The exact
+    // amount of slop is configurable via
+    // log_container_excess_space_before_cleanup_fraction.
+    //
+    // Too little slop and we'll do unnecessary work at startup. Too much and
+    // more unused space may go unreclaimed.
+    string data_filename = StrCat(container->ToString(), kContainerDataFileSuffix);
+    uint64_t reported_size;
+    s = env_->GetFileSizeOnDisk(data_filename, &reported_size);
+    if (!s.ok()) {
+      HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
+          ErrorHandlerType::DISK_ERROR, dir));
+      result->status = s.CloneAndPrepend(Substitute(
+          "Could not get on-disk file size of container $0", container->ToString()));
+      return;
     }
-    local_report.stats.live_block_bytes += container->live_bytes();
-    local_report.stats.live_block_bytes_aligned += container->live_bytes_aligned();
-    local_report.stats.live_block_count += container->live_blocks();
-    local_report.stats.lbm_container_count++;
-
-    // Log number of containers opened every 10 seconds
-    MonoTime now = MonoTime::Now();
-    if ((now - last_opened_container_log_time).ToSeconds() > 10) {
-      LOG(INFO) << Substitute("Opened $0 log block containers in $1",
-                              local_report.stats.lbm_container_count, dir->dir());
-      last_opened_container_log_time = now;
+    int64_t cleanup_threshold_size = container->live_bytes_aligned() *
+        (1 + FLAGS_log_container_excess_space_before_cleanup_fraction);
+    if (reported_size > cleanup_threshold_size) {
+      result->report.full_container_space_check->entries.emplace_back(
+          container->ToString(), reported_size - container->live_bytes_aligned());
+
+      // If the container is to be deleted outright, don't bother repunching
+      // its blocks. The report entry remains, however, so it's clear that
+      // there was a space discrepancy.
+      if (container->live_blocks()) {
+        result->need_repunching_blocks.insert(result->need_repunching_blocks.end(),
+                                              dead_blocks.begin(), dead_blocks.end());
+      }
     }
 
-    next_block_id_.StoreMax(max_block_id + 1);
+    result->report.stats.lbm_full_container_count++;
+  }
+  result->report.stats.live_block_bytes += container->live_bytes();
+  result->report.stats.live_block_bytes_aligned += container->live_bytes_aligned();
+  result->report.stats.live_block_count += container->live_blocks();
+  result->report.stats.lbm_container_count++;
 
-    int64_t mem_usage = 0;
-    for (UntrackedBlockMap::value_type& e : live_blocks) {
-      int block_mem = kudu_malloc_usable_size(e.second.get());
-      if (!AddLogBlock(std::move(e.second))) {
-        // TODO(adar): track as an inconsistency?
-        LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
-                   << " which already is alive from another container when "
-                   << " processing container " << container->ToString();
-      }
-      mem_usage += block_mem;
+  next_block_id_.StoreMax(max_block_id + 1);
+
+  int64_t mem_usage = 0;
+  for (UntrackedBlockMap::value_type& e : live_blocks) {
+    int block_mem = kudu_malloc_usable_size(e.second.get());
+    if (!AddLogBlock(std::move(e.second))) {
+      // TODO(adar): track as an inconsistency?
+      LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
+                 << " which already is alive from another container when "
+                 << " processing container " << container->ToString();
     }
+    mem_usage += block_mem;
+  }
 
-    mem_tracker_->Consume(mem_usage);
+  mem_tracker_->Consume(mem_usage);
 
+  int64_t container_count = 0;
+  {
     std::lock_guard<simple_spinlock> l(lock_);
     AddNewContainerUnlocked(container);
     MakeContainerAvailableUnlocked(std::move(container));
+    container_count = all_containers_by_name_.size();
   }
 
-  // Like the rest of Open(), repairs are performed per data directory to take
-  // advantage of parallelism.
-  s = Repair(dir,
-             &local_report,
-             std::move(need_repunching),
-             std::move(dead_containers),
-             std::move(low_live_block_containers));
-  if (!s.ok()) {
-    *result_status = s.CloneAndPrepend(Substitute(
-        "fatal error while repairing inconsistencies in data directory $0",
-        dir->dir()));
-    return;
+  // Log every 200 number of log block containers
+  if (container_count % 200 == 0) {
+    LOG(INFO) << Substitute("Opened $0 log block containers", container_count);
   }
+}
 
-  *report = std::move(local_report);
-  *result_status = Status::OK();
+void LogBlockManager::RepairTask(DataDir* dir, internal::LogBlockContainerLoadResult* result) {
+  result->status = Repair(dir,
+                          &result->report,
+                          std::move(result->need_repunching_blocks),
+                          std::move(result->dead_containers),
+                          std::move(result->low_live_block_containers));
 }
 
 #define RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(status_expr, msg) do { \
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 7631042..3c26f90 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -59,6 +59,7 @@ class LogBlock;
 class LogBlockContainer;
 class LogBlockDeletionTransaction;
 class LogWritableBlock;
+struct LogBlockContainerLoadResult;
 struct LogBlockManagerMetrics;
 } // namespace internal
 
@@ -328,6 +329,9 @@ class LogBlockManager : public BlockManager {
   Status RemoveLogBlock(const BlockId& block_id,
                         LogBlockRefPtr* lb);
 
+  // Simple wrapper of Repair(), used as a runnable function in thread.
+  void RepairTask(DataDir* dir, internal::LogBlockContainerLoadResult* result);
+
   // Repairs any inconsistencies for 'dir' described in 'report'.
   //
   // The following additional repairs will be performed:
@@ -358,14 +362,19 @@ class LogBlockManager : public BlockManager {
                              int64_t* file_bytes_delta);
 
   // Opens a particular data directory belonging to the block manager. The
-  // results of consistency checking (and repair, if applicable) are written to
-  // 'report'.
+  // results of consistency checking are written to 'results'.
   //
   // Success or failure is set in 'result_status'.
   void OpenDataDir(DataDir* dir,
-                   FsReport* report,
+                   std::vector<std::unique_ptr<internal::LogBlockContainerLoadResult>>* results,
                    Status* result_status);
 
+  // Reads records from one log block container in the data directory.
+  // The result details will be collected into 'result'.
+  void LoadContainer(DataDir* dir,
+                     LogBlockContainerRefPtr container,
+                     internal::LogBlockContainerLoadResult* result);
+
   // Perform basic initialization.
   Status Init();