You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2022/03/24 07:34:40 UTC

[kudu] branch master updated: KUDU-3318 [LBM] Runtime compact log container metadata

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new a31a251  KUDU-3318 [LBM] Runtime compact log container metadata
a31a251 is described below

commit a31a2512c90193a2d972c6826a3ad30ab23c95c8
Author: Yingchun Lai <ac...@gmail.com>
AuthorDate: Wed Sep 15 20:47:48 2021 +0800

    KUDU-3318 [LBM] Runtime compact log container metadata
    
    In commit 49abc3a6788bc29c8c023cafff0f0955142a2409, we have added
    a logic to make log container metadata file to be full, then no
    more CREATE type blocks will be appended to this container. However,
    the metadata will still consuming disk space even if it's in a very
    low live ratio.
    This patch adds a method to compact metadata when found it's in low
    live ratio when append DELETE type blocks to it.
    
    Change-Id: I72bdf73fa73403918c01dfa9d45530fad47ecaf6
    Reviewed-on: http://gerrit.cloudera.org:8080/17871
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/fs/log_block_manager-test.cc | 103 +++++++++++-
 src/kudu/fs/log_block_manager.cc      | 289 ++++++++++++++++++++++++++--------
 2 files changed, 321 insertions(+), 71 deletions(-)

diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 9ad958d..c8f05e9 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -65,6 +65,7 @@
 #include "kudu/util/stopwatch.h" // IWYU pragma: keep
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
+#include "kudu/util/threadpool.h"
 
 using kudu::pb_util::ReadablePBContainerFile;
 using std::set;
@@ -88,6 +89,8 @@ DECLARE_string(env_inject_eio_globs);
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
 DECLARE_uint64(log_container_metadata_max_size);
+DECLARE_bool(log_container_metadata_runtime_compact);
+DECLARE_double(log_container_metadata_size_before_compact_ratio);
 DEFINE_int32(startup_benchmark_block_count_for_testing, 1000000,
              "Block count to do startup benchmark.");
 DEFINE_int32(startup_benchmark_data_dir_count_for_testing, 8,
@@ -202,6 +205,10 @@ class LogBlockManagerTest : public KuduTest, public ::testing::WithParamInterfac
     *metadata_file = metadata_files[0];
   }
 
+  void GetContainerMetadataFiles(vector<string>* metadata_files) {
+    DoGetContainers(METADATA_FILES, metadata_files);
+  }
+
   // Like GetOnlyContainerDataFile(), but returns a container name (i.e. data
   // or metadata file with the file suffix removed).
   void GetOnlyContainer(string* container) {
@@ -449,10 +456,7 @@ TEST_P(LogBlockManagerTest, MetricsTest) {
           {1, &METRIC_block_manager_total_blocks_deleted},
           {0, &METRIC_log_block_manager_dead_containers_deleted} }));
   }
-  // Wait for the actual hole punching to take place.
-  for (const auto& data_dir : dd_manager_->dirs()) {
-    data_dir->WaitOnClosures();
-  }
+  dd_manager_->WaitOnClosures();
   NO_FATALS(CheckLogMetrics(new_entity,
       { {9 * 1024, &METRIC_log_block_manager_bytes_under_management},
         {10, &METRIC_log_block_manager_blocks_under_management},
@@ -1270,8 +1274,99 @@ TEST_P(LogBlockManagerTest, TestContainerBlockLimitingByMetadataSize) {
   NO_FATALS(AssertNumContainers(4));
 }
 
+TEST_F(LogBlockManagerTest, TestContainerBlockLimitingByMetadataSizeWithCompaction) {
+  const int kNumBlocks = 2000;
+  const int kNumThreads = 10;
+  const double kLiveBlockRatio = 0.1;
+
+  // Creates and deletes some blocks.
+  auto create_and_delete_blocks = [&]() {
+    vector<BlockId> ids;
+    // Creates 'kNumBlocks' blocks.
+    for (int i = 0; i < kNumBlocks; i++) {
+      unique_ptr<WritableBlock> block;
+      RETURN_NOT_OK(bm_->CreateBlock(test_block_opts_, &block));
+      RETURN_NOT_OK(block->Append("aaaa"));
+      RETURN_NOT_OK(block->Close());
+      ids.push_back(block->id());
+    }
+
+    // Deletes 'kNumBlocks * (1 - kLiveBlockRatio)' blocks.
+    shared_ptr<BlockDeletionTransaction> deletion_transaction =
+        bm_->NewDeletionTransaction();
+    for (const auto& id : ids) {
+      if (rand() % 100 < 100 * kLiveBlockRatio) {
+        continue;
+      }
+      deletion_transaction->AddDeletedBlock(id);
+    }
+    vector<BlockId> deleted;
+    RETURN_NOT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
+
+    return Status::OK();
+  };
+
+  // Create a thread pool to create and delete blocks.
+  unique_ptr<ThreadPool> pool;
+  ASSERT_OK(ThreadPoolBuilder("test-metadata-compact-pool")
+                .set_max_threads(kNumThreads)
+                .Build(&pool));
+  auto mt_create_and_delete_blocks = [&]() {
+    for (int i = 0; i < kNumThreads; ++i) {
+      ASSERT_OK(pool->Submit(create_and_delete_blocks));
+    }
+    pool->Wait();
+    dd_manager_->WaitOnClosures();
+  };
+
+  FLAGS_log_container_metadata_runtime_compact = true;
+  // Define a small value to make metadata easy to be full.
+  FLAGS_log_container_metadata_max_size = 32 * 1024;
+  NO_FATALS(mt_create_and_delete_blocks());
+  vector<string> metadata_files;
+  NO_FATALS(GetContainerMetadataFiles(&metadata_files));
+  for (const auto& metadata_file : metadata_files) {
+    uint64_t file_size;
+    NO_FATALS(env_->GetFileSize(metadata_file, &file_size));
+    ASSERT_GE(FLAGS_log_container_metadata_max_size *
+                  FLAGS_log_container_metadata_size_before_compact_ratio,
+              file_size);
+  }
+
+  // Reopen and test again.
+  ASSERT_OK(ReopenBlockManager());
+  NO_FATALS(mt_create_and_delete_blocks());
+  NO_FATALS(GetContainerMetadataFiles(&metadata_files));
+  for (const auto& metadata_file : metadata_files) {
+    uint64_t file_size;
+    NO_FATALS(env_->GetFileSize(metadata_file, &file_size));
+    ASSERT_GE(FLAGS_log_container_metadata_max_size *
+                  FLAGS_log_container_metadata_size_before_compact_ratio,
+              file_size);
+  }
+
+  // Now remove the limit and create more blocks. They should go into existing
+  // containers, which are now no longer full.
+  FLAGS_log_container_metadata_runtime_compact = false;
+  ASSERT_OK(ReopenBlockManager());
+  NO_FATALS(mt_create_and_delete_blocks());
+  NO_FATALS(GetContainerMetadataFiles(&metadata_files));
+  bool exist_larger_one = false;
+  for (const auto& metadata_file : metadata_files) {
+    uint64_t file_size;
+    NO_FATALS(env_->GetFileSize(metadata_file, &file_size));
+    if (file_size > FLAGS_log_container_metadata_max_size *
+                         FLAGS_log_container_metadata_size_before_compact_ratio) {
+      exist_larger_one = true;
+      break;
+    }
+  }
+  ASSERT_TRUE(exist_larger_one);
+}
+
 TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
   EnableEncryption(GetParam());
+
   FLAGS_log_container_preallocate_bytes = 0;
   const int kNumBlocks = 100;
 
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 41275ba..7a68d06 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -61,6 +61,7 @@
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/file_cache.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/flag_validators.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/metrics.h"
@@ -68,6 +69,7 @@
 #include "kudu/util/pb_util.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
+#include "kudu/util/rw_mutex.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/sorted_disjoint_interval_list.h"
@@ -91,6 +93,12 @@ TAG_FLAG(log_container_metadata_max_size, advanced);
 TAG_FLAG(log_container_metadata_max_size, experimental);
 TAG_FLAG(log_container_metadata_max_size, runtime);
 
+DEFINE_bool(log_container_metadata_runtime_compact, false,
+            "Whether to enable metadata file compaction at runtime.");
+TAG_FLAG(log_container_metadata_runtime_compact, advanced);
+TAG_FLAG(log_container_metadata_runtime_compact, experimental);
+TAG_FLAG(log_container_metadata_runtime_compact, runtime);
+
 DEFINE_int64(log_container_max_blocks, -1,
              "Maximum number of blocks (soft) of a log container. Use 0 for "
              "no limit. Use -1 for no limit except in the case of a kernel "
@@ -112,9 +120,21 @@ TAG_FLAG(log_container_excess_space_before_cleanup_fraction, advanced);
 DEFINE_double(log_container_live_metadata_before_compact_ratio, 0.50,
               "Desired ratio of live block metadata in log containers. If a "
               "container's live to total block ratio dips below this value, "
-              "the container's metadata file will be compacted at startup.");
+              "the container's metadata file will be compacted at startup, "
+              "or at runtime, based on the configuration of "
+              "--log_container_metadata_size_before_compact_ratio.");
 TAG_FLAG(log_container_live_metadata_before_compact_ratio, experimental);
 
+DEFINE_double(log_container_metadata_size_before_compact_ratio, 0.80,
+              "Desired portion of --log_container_metadata_max_size container metadata must "
+              "consume before metadata is considered for compaction. If a container's metadata "
+              "file exceeds --log_container_metadata_max_size * "
+              "--log_container_metadata_size_before_compact_ratio, and the number of live blocks "
+              "falls below --log_container_live_metadata_before_compact_ratio, the container's "
+              "metadata will be compacted.");
+TAG_FLAG(log_container_metadata_size_before_compact_ratio, advanced);
+TAG_FLAG(log_container_metadata_size_before_compact_ratio, experimental);
+
 DEFINE_bool(log_block_manager_test_hole_punching, true,
             "Ensure hole punching is supported by the underlying filesystem");
 TAG_FLAG(log_block_manager_test_hole_punching, advanced);
@@ -203,6 +223,19 @@ using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
+bool ValidateMetadataCompactFlags() {
+  if (FLAGS_log_container_metadata_runtime_compact &&
+      FLAGS_log_container_metadata_size_before_compact_ratio <= 0) {
+    LOG(ERROR) << Substitute(
+        "--log_container_metadata_size_before_compact_ratio ($0) must be greater than 0 when "
+        "enabling --log_container_metadata_runtime_compact",
+        FLAGS_log_container_metadata_size_before_compact_ratio);
+    return false;
+  }
+  return true;
+}
+GROUP_FLAG_VALIDATOR(metadata_compact_flags, ValidateMetadataCompactFlags);
+
 namespace internal {
 
 ////////////////////////////////////////////////////////////
@@ -464,13 +497,6 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
   // Does not guarantee data durability; use SyncData() for that.
   Status FlushData(int64_t offset, int64_t length);
 
-  // Asynchronously flush this container's metadata file (all dirty bits).
-  //
-  // Does not guarantee metadata durability; use SyncMetadata() for that.
-  //
-  // TODO(unknown): Add support to just flush a range.
-  Status FlushMetadata();
-
   // Synchronize this container's data file with the disk. On success,
   // guarantees that the data is made durable.
   //
@@ -497,6 +523,13 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
   // This function is thread unsafe.
   Status TruncateDataToNextBlockOffset();
 
+  // Whether to update internal counters based on processed records. This may be
+  // useful to avoid recomputing container statistics during operations that
+  // don't change them, e.g. compacting container metadata.
+  enum class ProcessRecordType {
+    kReadOnly,       // Read records only.
+    kReadAndUpdate,  // Read records and update container's statistic.
+  };
   // Reads the container's metadata from disk, sanity checking and processing
   // records along the way.
   //
@@ -512,7 +545,8 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
       LogBlockManager::UntrackedBlockMap* live_blocks,
       LogBlockManager::BlockRecordMap* live_block_records,
       vector<LogBlockRefPtr>* dead_blocks,
-      uint64_t* max_block_id);
+      uint64_t* max_block_id,
+      ProcessRecordType type);
 
   // Updates internal bookkeeping state to reflect the creation of a block.
   void BlockCreated(const LogBlockRefPtr& block);
@@ -566,11 +600,23 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
   int64_t live_blocks() const { return live_blocks_.Load(); }
   int32_t blocks_being_written() const { return blocks_being_written_.Load(); }
   bool full() const {
-    return next_block_offset() >= FLAGS_log_container_max_size ||
-        (max_num_blocks_ && (total_blocks() >= max_num_blocks_)) ||
-        (FLAGS_log_container_metadata_max_size > 0 &&
-         (metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size));
+    if (next_block_offset() >= FLAGS_log_container_max_size ||
+        (max_num_blocks_ && total_blocks() >= max_num_blocks_)) {
+      return true;
+    }
+
+    if (FLAGS_log_container_metadata_max_size <= 0) {
+      return false;
+    }
+
+    // Try lock before reading metadata offset, consider it not full if lock failed.
+    shared_lock<RWMutex> l(metadata_compact_lock_, std::try_to_lock);
+    if (!l.owns_lock()) {
+      return false;
+    }
+    return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size;
   }
+
   bool dead() const { return dead_.Load(); }
   const LogBlockManagerMetrics* metrics() const { return metrics_; }
   Dir* data_dir() const { return data_dir_; }
@@ -607,6 +653,26 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
     return dead_.CompareAndSet(false, true);
   }
 
+  bool ShouldCompact() const {
+    shared_lock<RWMutex> l(metadata_compact_lock_);
+    return ShouldCompactUnlocked();
+  }
+
+  bool ShouldCompactUnlocked() const {
+    DCHECK_GT(FLAGS_log_container_metadata_max_size, 0);
+    if (live_blocks() >=
+        total_blocks() * FLAGS_log_container_live_metadata_before_compact_ratio) {
+      return false;
+    }
+
+    return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size *
+                                       FLAGS_log_container_metadata_size_before_compact_ratio;
+  }
+
+  void CompactMetadata();
+
+  static std::vector<BlockRecordPB> SortRecords(LogBlockManager::BlockRecordMap live_block_records);
+
  private:
   LogBlockContainer(LogBlockManager* block_manager, Dir* data_dir,
                     unique_ptr<WritablePBContainerFile> metadata_file,
@@ -656,7 +722,8 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
       LogBlockManager::BlockRecordMap* live_block_records,
       vector<LogBlockRefPtr>* dead_blocks,
       uint64_t* data_file_size,
-      uint64_t* max_block_id);
+      uint64_t* max_block_id,
+      ProcessRecordType type);
 
   // Updates this container data file's position based on the offset and length
   // of a block, marking this container as full if needed. Should only be called
@@ -677,6 +744,10 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
   // Offset up to which we have preallocated bytes.
   int64_t preallocated_offset_ = 0;
 
+  // Protect 'metadata_file_', only rewriting should add write lock,
+  // appending and syncing only need read lock, cause there is an
+  // internal lock for these operations in WritablePBContainerFile.
+  mutable RWMutex metadata_compact_lock_;
   // Opened file handles to the container's files.
   unique_ptr<WritablePBContainerFile> metadata_file_;
   shared_ptr<RWFile> data_file_;
@@ -687,6 +758,9 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
   // The amount of data (post block alignment) written thus far to the container.
   AtomicInt<int64_t> total_bytes_;
 
+  // TODO(yingchun): add metadata bytes for metadata.
+  //AtomicInt<int64_t> metadata_bytes_;
+
   // The number of blocks written thus far in the container.
   AtomicInt<int64_t> total_blocks_;
 
@@ -735,6 +809,7 @@ LogBlockContainer::LogBlockContainer(
       data_dir_(data_dir),
       max_num_blocks_(FindOrDie(block_manager->block_limits_by_data_dir_,
                                 data_dir)),
+      metadata_compact_lock_(RWMutex::Priority::PREFER_READING),
       metadata_file_(std::move(metadata_file)),
       data_file_(std::move(data_file)),
       next_block_offset_(0),
@@ -786,6 +861,69 @@ void LogBlockContainer::HandleError(const Status& s) const {
                                                                data_dir_));
 }
 
+void LogBlockContainer::CompactMetadata() {
+  SCOPED_LOG_SLOW_EXECUTION(WARNING, 5, Substitute("CompactMetadata $0", ToString()));
+  // Skip compacting if lock failed to reduce overhead, metadata is on compacting or will be
+  // compacted next time.
+  std::unique_lock<RWMutex> l(metadata_compact_lock_, std::try_to_lock);
+  if (!l.owns_lock()) {
+    return;
+  }
+  // Check again, in case the metadata was compacted while we were waiting.
+  if (!ShouldCompactUnlocked()) {
+    return;
+  }
+
+  FsReport report;
+  report.full_container_space_check.emplace();
+  report.incomplete_container_check.emplace();
+  report.malformed_record_check.emplace();
+  report.misaligned_block_check.emplace();
+  report.partial_record_check.emplace();
+
+  LogBlockManager::UntrackedBlockMap live_blocks;
+  LogBlockManager::BlockRecordMap live_block_records;
+  vector<LogBlockRefPtr> dead_blocks;
+  uint64_t max_block_id = 0;
+
+  Status s;
+  SCOPED_CLEANUP({
+    if (!s.ok()) {
+      // Make container read-only to forbid further writes in case of failure.
+      // Because the on-disk state may contain partial/incomplete data/metadata at
+      // this point, it is not safe to either overwrite it or append to it.
+      SetReadOnly(s);
+    }
+  });
+  s = ProcessRecords(
+      &report, &live_blocks, &live_block_records, &dead_blocks, &max_block_id,
+      ProcessRecordType::kReadOnly);  // Container statistic is updated, not need to update again.
+  if (!s.ok()) {
+    WARN_NOT_OK(s, Substitute("Could not process records in container $0", ToString()));
+    return;
+  }
+
+  vector<BlockRecordPB> records = SortRecords(std::move(live_block_records));
+  int64_t file_bytes_delta;
+  s = block_manager_->RewriteMetadataFile(*this, records, &file_bytes_delta);
+  if (!s.ok()) {
+    WARN_NOT_OK(s, Substitute("Could not rewrite container metadata file $0", ToString()));
+    return;
+  }
+
+  // However, we're hosed if we can't open the new metadata file.
+  s = ReopenMetadataWriter();
+  if (!s.ok()) {
+    WARN_NOT_OK(s, Substitute("Could not reopen new metadata file $0", ToString()));
+    return;
+  }
+  VLOG(1) << "Compacted metadata file " << ToString()
+          << " (saved " << file_bytes_delta << " bytes)";
+
+  total_blocks_.Store(live_blocks.size());
+  live_blocks_.Store(live_blocks.size());
+}
+
 #define RETURN_NOT_OK_CONTAINER_DISK_FAILURE(status_expr) do { \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
     block_manager->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir)); \
@@ -1016,7 +1154,8 @@ Status LogBlockContainer::ProcessRecords(
     LogBlockManager::UntrackedBlockMap* live_blocks,
     LogBlockManager::BlockRecordMap* live_block_records,
     vector<LogBlockRefPtr>* dead_blocks,
-    uint64_t* max_block_id) {
+    uint64_t* max_block_id,
+    ProcessRecordType type) {
   string metadata_path = metadata_file_->filename();
   unique_ptr<RandomAccessFile> metadata_reader;
   RandomAccessFileOptions opts;
@@ -1036,7 +1175,7 @@ Status LogBlockContainer::ProcessRecords(
     }
     RETURN_NOT_OK(ProcessRecord(&record, report,
                                 live_blocks, live_block_records, dead_blocks,
-                                &data_file_size, max_block_id));
+                                &data_file_size, max_block_id, type));
   }
 
   // NOTE: 'read_status' will never be OK here.
@@ -1065,7 +1204,8 @@ Status LogBlockContainer::ProcessRecord(
     LogBlockManager::BlockRecordMap* live_block_records,
     vector<LogBlockRefPtr>* dead_blocks,
     uint64_t* data_file_size,
-    uint64_t* max_block_id) {
+    uint64_t* max_block_id,
+    ProcessRecordType type) {
   const BlockId block_id(BlockId::FromPB(record->block_id()));
   LogBlockRefPtr lb;
   switch (record->op_type()) {
@@ -1114,15 +1254,17 @@ Status LogBlockContainer::ProcessRecord(
                             block_id.ToString(),
                             record->offset(), record->length());
 
-      // This block must be included in the container's logical size, even if
-      // it has since been deleted. This helps satisfy one of our invariants:
-      // once a container byte range has been used, it may never be reused in
-      // the future.
-      //
-      // If we ignored deleted blocks, we would end up reusing the space
-      // belonging to the last deleted block in the container.
-      UpdateNextBlockOffset(lb->offset(), lb->length());
-      BlockCreated(lb);
+      if (type == ProcessRecordType::kReadAndUpdate) {
+        // This block must be included in the container's logical size, even if
+        // it has since been deleted. This helps satisfy one of our invariants:
+        // once a container byte range has been used, it may never be reused in
+        // the future.
+        //
+        // If we ignored deleted blocks, we would end up reusing the space
+        // belonging to the last deleted block in the container.
+        UpdateNextBlockOffset(lb->offset(), lb->length());
+        BlockCreated(lb);
+      }
 
       (*live_block_records)[block_id].Swap(record);
       *max_block_id = std::max(*max_block_id, block_id.id());
@@ -1137,7 +1279,9 @@ Status LogBlockContainer::ProcessRecord(
         break;
       }
       VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
-      BlockDeleted(lb);
+      if (type == ProcessRecordType::kReadAndUpdate) {
+        BlockDeleted(lb);
+      }
 
       CHECK_EQ(1, live_block_records->erase(block_id));
       dead_blocks->emplace_back(std::move(lb));
@@ -1243,6 +1387,7 @@ Status LogBlockContainer::AppendMetadata(const BlockRecordPB& pb) {
   RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
   // Note: We don't check for sufficient disk space for metadata writes in
   // order to allow for block deletion on full disks.
+  shared_lock<RWMutex> l(metadata_compact_lock_);
   RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Append(pb));
   return Status::OK();
 }
@@ -1255,12 +1400,6 @@ Status LogBlockContainer::FlushData(int64_t offset, int64_t length) {
   return Status::OK();
 }
 
-Status LogBlockContainer::FlushMetadata() {
-  RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
-  RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Flush());
-  return Status::OK();
-}
-
 Status LogBlockContainer::SyncData() {
   RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
   if (FLAGS_enable_data_block_fsync) {
@@ -1274,6 +1413,7 @@ Status LogBlockContainer::SyncMetadata() {
   RETURN_NOT_OK_HANDLE_ERROR(read_only_status());
   if (FLAGS_enable_data_block_fsync) {
     if (metrics_) metrics_->generic_metrics.total_disk_sync->Increment();
+    shared_lock<RWMutex> l(metadata_compact_lock_);
     RETURN_NOT_OK_HANDLE_ERROR(metadata_file_->Sync());
   }
   return Status::OK();
@@ -1379,6 +1519,42 @@ void LogBlockContainer::UpdateNextBlockOffset(int64_t block_offset, int64_t bloc
   }
 }
 
+vector<BlockRecordPB> LogBlockContainer::SortRecords(
+    LogBlockManager::BlockRecordMap live_block_records) {
+  // TODO(adar): this should be reported as an inconsistency once
+  // container metadata compaction is also done in realtime. Until then,
+  // it would be confusing to report it as such since it'll be a natural
+  // event at startup.
+  vector<BlockRecordPB> records(live_block_records.size());
+  int i = 0;
+  for (auto& e : live_block_records) {
+    records[i].Swap(&e.second);
+    i++;
+  }
+
+  // Sort the records such that their ordering reflects the ordering in
+  // the pre-compacted metadata file.
+  //
+  // This is preferred to storing the records in an order-preserving
+  // container (such as std::map) because while records are temporarily
+  // retained for every container, only some containers will actually
+  // undergo metadata compaction.
+  std::sort(records.begin(), records.end(), [](const BlockRecordPB& a, const BlockRecordPB& b) {
+    // Sort by timestamp.
+    if (a.timestamp_us() != b.timestamp_us()) {
+      return a.timestamp_us() < b.timestamp_us();
+    }
+
+    // If the timestamps match, sort by offset.
+    //
+    // If the offsets also match (i.e. both blocks are of zero length),
+    // it doesn't matter which of the two records comes first.
+    return a.offset() < b.offset();
+  });
+
+  return records;
+}
+
 void LogBlockContainer::BlockCreated(const LogBlockRefPtr& block) {
   DCHECK_GE(block->offset(), 0);
 
@@ -2531,6 +2707,16 @@ Status LogBlockManager::RemoveLogBlocks(vector<BlockId> block_ids,
             "Unable to append deletion record to block metadata");
       }
     } else {
+      // Metadata files of containers with very few live blocks will be compacted.
+      if (!lb->container()->read_only() &&
+          FLAGS_log_container_metadata_runtime_compact &&
+          lb->container()->ShouldCompact()) {
+        scoped_refptr<LogBlockContainer> self(lb->container());
+        lb->container()->ExecClosure([self]() {
+          self->CompactMetadata();
+        });
+      }
+
       deleted->emplace_back(lb->block_id());
       log_blocks->emplace_back(std::move(lb));
     }
@@ -2670,7 +2856,8 @@ void LogBlockManager::LoadContainer(Dir* dir,
                                        &live_blocks,
                                        &live_block_records,
                                        &dead_blocks,
-                                       &max_block_id);
+                                       &max_block_id,
+                                       LogBlockContainer::ProcessRecordType::kReadAndUpdate);
   if (!s.ok()) {
     result->status = s.CloneAndPrepend(Substitute(
         "Could not process records in container $0", container->ToString()));
@@ -2704,39 +2891,7 @@ void LogBlockManager::LoadContainer(Dir* dir,
     } else if (static_cast<double>(container->live_blocks()) /
         container->total_blocks() <= FLAGS_log_container_live_metadata_before_compact_ratio) {
       // Metadata files of containers with very few live blocks will be compacted.
-      //
-      // TODO(adar): this should be reported as an inconsistency once
-      // container metadata compaction is also done in realtime. Until then,
-      // it would be confusing to report it as such since it'll be a natural
-      // event at startup.
-      vector<BlockRecordPB> records(live_block_records.size());
-      int i = 0;
-      for (auto& e : live_block_records) {
-        records[i].Swap(&e.second);
-        i++;
-      }
-
-      // Sort the records such that their ordering reflects the ordering in
-      // the pre-compacted metadata file.
-      //
-      // This is preferred to storing the records in an order-preserving
-      // container (such as std::map) because while records are temporarily
-      // retained for every container, only some containers will actually
-      // undergo metadata compaction.
-      std::sort(records.begin(), records.end(),
-                [](const BlockRecordPB& a, const BlockRecordPB& b) {
-        // Sort by timestamp.
-        if (a.timestamp_us() != b.timestamp_us()) {
-          return a.timestamp_us() < b.timestamp_us();
-        }
-
-        // If the timestamps match, sort by offset.
-        //
-        // If the offsets also match (i.e. both blocks are of zero length),
-        // it doesn't matter which of the two records comes first.
-        return a.offset() < b.offset();
-      });
-
+      vector<BlockRecordPB> records = LogBlockContainer::SortRecords(std::move(live_block_records));
       result->low_live_block_containers[container->ToString()] = std::move(records);
     }