You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/05/02 21:13:54 UTC

[2/3] kudu git commit: log_block_manager: refactor container record processing

log_block_manager: refactor container record processing

While working on some other LBM stuff I noticed that
LogBlockManager::ProcessBlockRecord() was effectively static; this patch
moves it into the LogBlockContainer. Besides narrowing the function's scope,
we can now process records as they are read rather than storing them in a
deque to be processed later.

To make this refactor work, I had to move the LogBlock definition so that
LogBlockContainer could use it. This also means various LogBlockContainer
methods can now take LogBlock-typed arguments, and GetAlignedBlockLength()
can move over to LogBlock where it belongs.

Change-Id: I5b8da74dc699f4afbeda41ee154eb1ba89dac292
Reviewed-on: http://gerrit.cloudera.org:8080/6769
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/72e58eca
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/72e58eca
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/72e58eca

Branch: refs/heads/master
Commit: 72e58eca93c36ed2dcce23032503b3d964a34236
Parents: 6a11859
Author: Adar Dembo <ad...@cloudera.com>
Authored: Sun Apr 30 20:51:13 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Tue May 2 21:11:41 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/log_block_manager.cc | 453 +++++++++++++++++-----------------
 src/kudu/fs/log_block_manager.h  |  26 +-
 2 files changed, 238 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/72e58eca/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index c866fc0..7e18c33 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -167,6 +167,59 @@ LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>
 #undef MINIT
 
 ////////////////////////////////////////////////////////////
+// LogBlock (declaration)
+////////////////////////////////////////////////////////////
+
+// The persistent metadata that describes a logical block.
+//
+// A block grows a LogBlock when its data has been synchronized with
+// the disk. That's when it's fully immutable (i.e. none of its metadata
+// can change), and when it becomes readable and persistent.
+//
+// LogBlocks are reference counted to simplify support for deletion with
+// outstanding readers. All refcount increments are performed with the
+// block manager lock held, as are deletion-based decrements. However,
+// no lock is held when ~LogReadableBlock decrements the refcount, thus it
+// must be made thread safe (by extending RefCountedThreadSafe instead of
+// the simpler RefCounted).
+class LogBlock : public RefCountedThreadSafe<LogBlock> {
+ public:
+  LogBlock(LogBlockContainer* container, BlockId block_id, int64_t offset,
+           int64_t length);
+  ~LogBlock();
+
+  const BlockId& block_id() const { return block_id_; }
+  LogBlockContainer* container() const { return container_; }
+  int64_t offset() const { return offset_; }
+  int64_t length() const { return length_; }
+
+  // Returns a block's length aligned to the nearest filesystem block size.
+  int64_t fs_aligned_length() const;
+
+  // Delete the block. Actual deletion takes place when the
+  // block is destructed.
+  void Delete();
+
+ private:
+  // The owning container. Must outlive the LogBlock.
+  LogBlockContainer* container_;
+
+  // The block identifier.
+  const BlockId block_id_;
+
+  // The block's offset in the container.
+  const int64_t offset_;
+
+  // The block's length.
+  const int64_t length_;
+
+  // Whether the block has been marked for deletion.
+  bool deleted_;
+
+  DISALLOW_COPY_AND_ASSIGN(LogBlock);
+};
+
+////////////////////////////////////////////////////////////
 // LogBlockContainer
 ////////////////////////////////////////////////////////////
 
@@ -281,18 +334,28 @@ class LogBlockContainer {
   // This function is thread unsafe.
   Status TruncateDataToNextBlockOffset();
 
-  // Reads the container's metadata from disk, sanity checking and
-  // returning the records in 'records'. Any on-disk inconsistencies are
-  // recorded in 'report'.
-  Status ReadContainerRecords(FsReport* report,
-                              deque<BlockRecordPB>* records) const;
+  // Reads the container's metadata from disk, sanity checking and processing
+  // records along the way.
+  //
+  // Malformed records and other container inconsistencies are written to
+  // 'report'. Healthy blocks are written either to 'live_blocks' or
+  // 'dead_blocks'. The greatest block ID seen thus far in the container is
+  // written to 'max_block_id'.
+  //
+  // Returns an error only if there was a problem accessing the container from
+  // disk; such errors are fatal and effectively halt processing immediately.
+  Status ProcessRecords(
+      FsReport* report,
+      LogBlockManager::UntrackedBlockMap* live_blocks,
+      std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+      uint64_t* max_block_id);
 
   // Updates internal bookkeeping state to reflect the creation of a block,
   // marking this container as full if needed. Should only be called when a
   // block is fully written, as it will round up the container data file's position.
   //
   // This function is thread unsafe.
-  void BlockCreated(int64_t block_offset, int64_t block_length);
+  void BlockCreated(const scoped_refptr<LogBlock>& block);
 
   // Updates internal bookkeeping state to reflect the deletion of a block.
   //
@@ -300,7 +363,7 @@ class LogBlockContainer {
   // deletions can happen concurrently with creations.
   //
   // Note: the container is not made "unfull"; containers remain sparse until deleted.
-  void BlockDeleted(int64_t block_offset, int64_t block_length);
+  void BlockDeleted(const scoped_refptr<LogBlock>& block);
 
   // Run a task on this container's data directory thread pool.
   //
@@ -311,9 +374,6 @@ class LogBlockContainer {
   // Produces a debug-friendly string representation of this container.
   string ToString() const;
 
-  // Returns a block's length aligned to the nearest filesystem block size;
-  int64_t GetAlignedBlockLength(int64_t block_offset, int64_t block_length) const;
-
   // Simple accessors.
   LogBlockManager* block_manager() const { return block_manager_; }
   int64_t next_block_offset() const { return next_block_offset_; }
@@ -339,19 +399,23 @@ class LogBlockContainer {
                     unique_ptr<WritablePBContainerFile> metadata_file,
                     shared_ptr<RWFile> data_file);
 
-  // Performs sanity checks on a block record.
+  // Processes a single block record, performing sanity checks on it and adding
+  // it either to 'live_blocks' or 'dead_blocks'.
   //
   // Returns an error only if there was a problem accessing the container from
   // disk; such errors are fatal and effectively halt processing immediately.
   //
   // On success, 'report' is updated with any inconsistencies found in the
   // record, 'data_file_size' may be updated with the latest size of the
-  // container's data file, and 'is_malformed_record' reflects whether or not
-  // the record was malformed.
-  Status CheckBlockRecord(const BlockRecordPB& record,
-                          FsReport* report,
-                          uint64_t* data_file_size,
-                          bool* is_malformed_record) const;
+  // container's data file, and 'max_block_id' reflects the largest block ID
+  // seen thus far in the container.
+  Status ProcessRecord(
+      const BlockRecordPB& record,
+      FsReport* report,
+      LogBlockManager::UntrackedBlockMap* live_blocks,
+      std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+      uint64_t* data_file_size,
+      uint64_t* max_block_id);
 
   // The owning block manager. Must outlive the container itself.
   LogBlockManager* const block_manager_;
@@ -576,8 +640,11 @@ Status LogBlockContainer::TruncateDataToNextBlockOffset() {
   return Status::OK();
 }
 
-Status LogBlockContainer::ReadContainerRecords(FsReport* report,
-                                               deque<BlockRecordPB>* records) const {
+Status LogBlockContainer::ProcessRecords(
+    FsReport* report,
+    LogBlockManager::UntrackedBlockMap* live_blocks,
+    vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+    uint64_t* max_block_id) {
   string metadata_path = metadata_file_->filename();
   unique_ptr<RandomAccessFile> metadata_reader;
   RETURN_NOT_OK(block_manager()->env()->NewRandomAccessFile(metadata_path, &metadata_reader));
@@ -585,7 +652,6 @@ Status LogBlockContainer::ReadContainerRecords(FsReport* report,
   RETURN_NOT_OK(pb_reader.Open());
 
   uint64_t data_file_size = 0;
-  deque<BlockRecordPB> local_records;
   Status read_status;
   while (true) {
     BlockRecordPB record;
@@ -593,19 +659,14 @@ Status LogBlockContainer::ReadContainerRecords(FsReport* report,
     if (!read_status.ok()) {
       break;
     }
-
-    bool is_malformed_record;
-    RETURN_NOT_OK(CheckBlockRecord(record, report,
-                                   &data_file_size, &is_malformed_record));
-    if (PREDICT_FALSE(!is_malformed_record)) {
-      local_records.emplace_back(std::move(record));
-    }
+    RETURN_NOT_OK(ProcessRecord(record, report,
+                                live_blocks, dead_blocks,
+                                &data_file_size, max_block_id));
   }
 
   // NOTE: 'read_status' will never be OK here.
   if (PREDICT_TRUE(read_status.IsEndOfFile())) {
     // We've reached the end of the file without any problems.
-    records->swap(local_records);
     return Status::OK();
   }
   if (read_status.IsIncomplete()) {
@@ -614,49 +675,95 @@ Status LogBlockContainer::ReadContainerRecords(FsReport* report,
     // write and truncate the metadata file to remove this partial record.
     report->partial_record_check->entries.emplace_back(ToString(),
                                                        pb_reader.offset());
-    records->swap(local_records);
     return Status::OK();
   }
   // If we've made it here, we've found (and are returning) an unrecoverable error.
   return read_status;
 }
 
-Status LogBlockContainer::CheckBlockRecord(const BlockRecordPB& record,
-                                           FsReport* report,
-                                           uint64_t* data_file_size,
-                                           bool* is_malformed_record) const {
-  if (record.op_type() == CREATE) {
-    // First verify that the record's offset/length aren't wildly incorrect.
-    if (PREDICT_FALSE(!record.has_offset() ||
-                      !record.has_length() ||
-                      record.offset() < 0  ||
-                      record.length() < 0)) {
-      report->malformed_record_check->entries.emplace_back(ToString(), record);
-      *is_malformed_record = true;
-      return Status::OK();
-    }
+Status LogBlockContainer::ProcessRecord(
+    const BlockRecordPB& record,
+    FsReport* report,
+    LogBlockManager::UntrackedBlockMap* live_blocks,
+    vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+    uint64_t* data_file_size,
+    uint64_t* max_block_id) {
+  BlockId block_id(BlockId::FromPB(record.block_id()));
+  scoped_refptr<LogBlock> lb;
+  switch (record.op_type()) {
+    case CREATE:
+      // First verify that the record's offset/length aren't wildly incorrect.
+      if (PREDICT_FALSE(!record.has_offset() ||
+                        !record.has_length() ||
+                        record.offset() < 0  ||
+                        record.length() < 0)) {
+        report->malformed_record_check->entries.emplace_back(ToString(), record);
+        break;
+      }
 
-    // Now it should be safe to access the record's offset/length.
-    //
-    // KUDU-1657: When opening a container in read-only mode which is actively
-    // being written to by another lbm, we must reinspect the data file's size
-    // frequently in order to account for the latest appends. Inspecting the
-    // file size is expensive, so we only do it when the metadata indicates
-    // that additional data has been written to the file.
-    if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
-      RETURN_NOT_OK(data_file_->Size(data_file_size));
-    }
+      // Now it should be safe to access the record's offset/length.
+      //
+      // KUDU-1657: When opening a container in read-only mode which is actively
+      // being written to by another lbm, we must reinspect the data file's size
+      // frequently in order to account for the latest appends. Inspecting the
+      // file size is expensive, so we only do it when the metadata indicates
+      // that additional data has been written to the file.
+      if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+        RETURN_NOT_OK(data_file_->Size(data_file_size));
+      }
+
+      // If the record still extends beyond the end of the file, it is malformed.
+      if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+        // TODO(adar): treat as a different kind of inconsistency?
+        report->malformed_record_check->entries.emplace_back(ToString(), record);
+        break;
+      }
+
+      lb = new LogBlock(this, block_id, record.offset(), record.length());
+      if (!InsertIfNotPresent(live_blocks, block_id, lb)) {
+        // We found a record whose ID matches that of an already created block.
+        //
+        // TODO(adar): treat as a different kind of inconsistency?
+        report->malformed_record_check->entries.emplace_back(
+            ToString(), record);
+        break;
+      }
+
+      VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
+                            block_id.ToString(),
+                            record.offset(), record.length());
+
+      // This block must be included in the container's logical size, even if
+      // it has since been deleted. This helps satisfy one of our invariants:
+      // once a container byte range has been used, it may never be reused in
+      // the future.
+      //
+      // If we ignored deleted blocks, we would end up reusing the space
+      // belonging to the last deleted block in the container.
+      BlockCreated(lb);
 
-    // If the record still extends beyond the end of the file, it is malformed.
-    if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+      *max_block_id = std::max(*max_block_id, block_id.id());
+      break;
+    case DELETE:
+      lb = EraseKeyReturnValuePtr(live_blocks, block_id);
+      if (!lb) {
+        // We found a record for which there is no already created block.
+        //
+        // TODO(adar): treat as a different kind of inconsistency?
+        report->malformed_record_check->entries.emplace_back(ToString(), record);
+        break;
+      }
+      VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
+      BlockDeleted(lb);
+      dead_blocks->emplace_back(std::move(lb));
+      break;
+    default:
+      // We found a record with an unknown type.
+      //
       // TODO(adar): treat as a different kind of inconsistency?
       report->malformed_record_check->entries.emplace_back(ToString(), record);
-      *is_malformed_record = true;
-      return Status::OK();
-    }
+      break;
   }
-
-  *is_malformed_record = false;
   return Status::OK();
 }
 
@@ -678,10 +785,10 @@ Status LogBlockContainer::FinishBlock(const Status& s, WritableBlock* block) {
   // will have written some garbage that can be expunged during a GC.
   RETURN_NOT_OK(block_manager()->SyncContainer(*this));
 
-  CHECK(block_manager()->AddLogBlock(this, block->id(),
-                                     next_block_offset_,
-                                     block->BytesAppended()));
-  BlockCreated(next_block_offset_, block->BytesAppended());
+  scoped_refptr<LogBlock> lb = block_manager()->AddLogBlock(
+      this, block->id(), next_block_offset_, block->BytesAppended());
+  CHECK(lb);
+  BlockCreated(lb);
 
   // Truncate the container if it's now full; any left over preallocated space
   // is no longer needed.
@@ -790,8 +897,8 @@ Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
   return Status::OK();
 }
 
-void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length) {
-  DCHECK_GE(block_offset, 0);
+void LogBlockContainer::BlockCreated(const scoped_refptr<LogBlock>& block) {
+  DCHECK_GE(block->offset(), 0);
 
   // The log block manager maintains block contiguity as an invariant, which
   // means accounting for the new block should be as simple as adding its
@@ -804,7 +911,8 @@ void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length)
   // boundary. This guarantees that the disk space can be reclaimed when
   // the block is deleted.
   int64_t new_next_block_offset = KUDU_ALIGN_UP(
-      block_offset + block_length, instance()->filesystem_block_size_bytes());
+      block->offset() + block->length(),
+      instance()->filesystem_block_size_bytes());
   if (PREDICT_FALSE(new_next_block_offset < next_block_offset_)) {
     LOG(WARNING) << Substitute(
         "Container $0 unexpectedly tried to lower its next block offset "
@@ -813,7 +921,7 @@ void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length)
   } else {
     int64_t aligned_block_length = new_next_block_offset - next_block_offset_;
     total_bytes_+= aligned_block_length;
-    live_bytes_.IncrementBy(block_length);
+    live_bytes_.IncrementBy(block->length());
     live_bytes_aligned_.IncrementBy(aligned_block_length);
     next_block_offset_ = new_next_block_offset;
   }
@@ -827,11 +935,11 @@ void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length)
   }
 }
 
-void LogBlockContainer::BlockDeleted(int64_t block_offset, int64_t block_length) {
-  DCHECK_GE(block_offset, 0);
+void LogBlockContainer::BlockDeleted(const scoped_refptr<LogBlock>& block) {
+  DCHECK_GE(block->offset(), 0);
 
-  live_bytes_.IncrementBy(-block_length);
-  live_bytes_aligned_.IncrementBy(-GetAlignedBlockLength(block_offset, block_length));
+  live_bytes_.IncrementBy(-block->length());
+  live_bytes_aligned_.IncrementBy(-block->fs_aligned_length());
   live_blocks_.IncrementBy(-1);
 }
 
@@ -846,77 +954,10 @@ string LogBlockContainer::ToString() const {
   return s;
 }
 
-
-int64_t LogBlockContainer::GetAlignedBlockLength(int64_t block_offset,
-                                                 int64_t block_length) const {
-  uint64_t fs_block_size =
-      data_dir_->instance()->metadata()->filesystem_block_size_bytes();
-
-  // Nearly all blocks are placed on a filesystem block boundary, which means
-  // their length post-alignment is simply their length aligned up to the
-  // nearest fs block size.
-  //
-  // However, due to KUDU-1793, some blocks may start or end at misaligned
-  // offsets. We don't maintain enough state to precisely pinpoint such a
-  // block's (aligned) end offset in this case, so we'll just undercount it.
-  // This should be safe, although it may mean unreclaimed disk space (i.e.
-  // when GetAlignedBlockLength() is used in hole punching).
-  if (PREDICT_TRUE(block_offset % fs_block_size == 0)) {
-    return KUDU_ALIGN_UP(block_length, fs_block_size);
-  }
-  return block_length;
-}
-
 ////////////////////////////////////////////////////////////
-// LogBlock
+// LogBlock (definition)
 ////////////////////////////////////////////////////////////
 
-// The persistent metadata that describes a logical block.
-//
-// A block grows a LogBlock when its data has been synchronized with
-// the disk. That's when it's fully immutable (i.e. none of its metadata
-// can change), and when it becomes readable and persistent.
-//
-// LogBlocks are reference counted to simplify support for deletion with
-// outstanding readers. All refcount increments are performed with the
-// block manager lock held, as are deletion-based decrements. However,
-// no lock is held when ~LogReadableBlock decrements the refcount, thus it
-// must be made thread safe (by extending RefCountedThreadSafe instead of
-// the simpler RefCounted).
-class LogBlock : public RefCountedThreadSafe<LogBlock> {
- public:
-  LogBlock(LogBlockContainer* container, BlockId block_id, int64_t offset,
-           int64_t length);
-  ~LogBlock();
-
-  const BlockId& block_id() const { return block_id_; }
-  LogBlockContainer* container() const { return container_; }
-  int64_t offset() const { return offset_; }
-  int64_t length() const { return length_; }
-
-  // Delete the block. Actual deletion takes place when the
-  // block is destructed.
-  void Delete();
-
- private:
-  // The owning container. Must outlive the LogBlock.
-  LogBlockContainer* container_;
-
-  // The block identifier.
-  const BlockId block_id_;
-
-  // The block's offset in the container.
-  const int64_t offset_;
-
-  // The block's length.
-  const int64_t length_;
-
-  // Whether the block has been marked for deletion.
-  bool deleted_;
-
-  DISALLOW_COPY_AND_ASSIGN(LogBlock);
-};
-
 LogBlock::LogBlock(LogBlockContainer* container, BlockId block_id,
                    int64_t offset, int64_t length)
     : container_(container),
@@ -931,24 +972,40 @@ LogBlock::LogBlock(LogBlockContainer* container, BlockId block_id,
 static void DeleteBlockAsync(LogBlockContainer* container,
                              BlockId block_id,
                              int64_t offset,
-                             int64_t length) {
-  // Use the block's aligned length so that the filesystem can reclaim maximal
-  // disk space.
-  //
+                             int64_t fs_aligned_length) {
   // We don't call SyncData() to synchronize the deletion because it's
   // expensive, and in the worst case, we'll just leave orphaned data
   // behind to be cleaned up in the next GC.
   VLOG(3) << "Freeing space belonging to block " << block_id;
-  WARN_NOT_OK(container->PunchHole(
-      offset, container->GetAlignedBlockLength(offset, length)),
+  WARN_NOT_OK(container->PunchHole(offset, fs_aligned_length),
               Substitute("Could not delete block $0", block_id.ToString()));
 }
 
 LogBlock::~LogBlock() {
   if (deleted_) {
+    // Use the block's aligned length so that the filesystem can reclaim
+    // maximal disk space.
     container_->ExecClosure(Bind(&DeleteBlockAsync, container_, block_id_,
-                                 offset_, length_));
+                                 offset_, fs_aligned_length()));
+  }
+}
+
+int64_t LogBlock::fs_aligned_length() const {
+  uint64_t fs_block_size = container_->instance()->filesystem_block_size_bytes();
+
+  // Nearly all blocks are placed on a filesystem block boundary, which means
+  // their length post-alignment is simply their length aligned up to the
+  // nearest fs block size.
+  //
+  // However, due to KUDU-1793, some blocks may start or end at misaligned
+  // offsets. We don't maintain enough state to precisely pinpoint such a
+  // block's (aligned) end offset in this case, so we'll just undercount it.
+  // This should be safe, although it may mean unreclaimed disk space (i.e.
+  // when fs_aligned_length() is used in hole punching).
+  if (PREDICT_TRUE(offset_ % fs_block_size == 0)) {
+    return KUDU_ALIGN_UP(length_, fs_block_size);
   }
+  return length_;
 }
 
 void LogBlock::Delete() {
@@ -1522,7 +1579,7 @@ Status LogBlockManager::DeleteBlock(const BlockId& block_id) {
   }
   VLOG(3) << "Deleting block " << block_id;
   lb->Delete();
-  lb->container()->BlockDeleted(lb->offset(), lb->length());
+  lb->container()->BlockDeleted(lb);
 
   // Record the on-disk deletion.
   //
@@ -1660,15 +1717,19 @@ bool LogBlockManager::TryUseBlockId(const BlockId& block_id) {
   return InsertIfNotPresent(&open_block_ids_, block_id);
 }
 
-bool LogBlockManager::AddLogBlock(LogBlockContainer* container,
-                                  const BlockId& block_id,
-                                  int64_t offset,
-                                  int64_t length) {
+scoped_refptr<LogBlock> LogBlockManager::AddLogBlock(
+    LogBlockContainer* container,
+    const BlockId& block_id,
+    int64_t offset,
+    int64_t length) {
   std::lock_guard<simple_spinlock> l(lock_);
   scoped_refptr<LogBlock> lb(new LogBlock(container, block_id, offset, length));
   mem_tracker_->Consume(kudu_malloc_usable_size(lb.get()));
 
-  return AddLogBlockUnlocked(lb);
+  if (AddLogBlockUnlocked(lb)) {
+    return lb;
+  }
+  return nullptr;
 }
 
 bool LogBlockManager::AddLogBlockUnlocked(const scoped_refptr<LogBlock>& lb) {
@@ -1763,16 +1824,8 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       return;
     }
 
-    // Populate the in-memory block maps using each container's records.
-    deque<BlockRecordPB> records;
-    s = container->ReadContainerRecords(&local_report, &records);
-    if (!s.ok()) {
-      *result_status = s.CloneAndPrepend(Substitute(
-          "Could not read records from container $0", container->ToString()));
-      return;
-    }
-
-    // Process the records, building a container-local map.
+    // Process the records, building a container-local map for live blocks and
+    // a list of dead blocks.
     //
     // It's important that we don't try to add these blocks to the global map
     // incrementally as we see each record, since it's possible that one container
@@ -1785,13 +1838,17 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     // NOTE: Since KUDU-1538, we allocate sequential block IDs, which makes reuse
     // exceedingly unlikely. However, we might have old data which still exhibits
     // the above issue.
-    UntrackedBlockMap blocks_in_container;
-    vector<scoped_refptr<internal::LogBlock>> deleted;
+    UntrackedBlockMap live_blocks;
+    vector<scoped_refptr<internal::LogBlock>> dead_blocks;
     uint64_t max_block_id = 0;
-    for (const BlockRecordPB& r : records) {
-      ProcessBlockRecord(r, &local_report, container.get(),
-                         &blocks_in_container, &deleted);
-      max_block_id = std::max(max_block_id, r.block_id().id());
+    s = container->ProcessRecords(&local_report,
+                                  &live_blocks,
+                                  &dead_blocks,
+                                  &max_block_id);
+    if (!s.ok()) {
+      *result_status = s.CloneAndPrepend(Substitute(
+          "Could not process records in container $0", container->ToString()));
+      return;
     }
 
     // With deleted blocks out of the way, check for misaligned blocks.
@@ -1800,7 +1857,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     // underlying filesystem's block size, an invariant maintained by the log
     // block manager. However, due to KUDU-1793, that invariant may have been
     // broken, so we'll note but otherwise allow it.
-    for (const auto& e : blocks_in_container) {
+    for (const auto& e : live_blocks) {
       if (PREDICT_FALSE(e.second->offset() %
                         container->instance()->filesystem_block_size_bytes() != 0)) {
         local_report.misaligned_block_check->entries.emplace_back(
@@ -1849,7 +1906,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
         local_report.full_container_space_check->entries.emplace_back(
             container->ToString(), reported_size - container->live_bytes_aligned());
         need_repunching.insert(need_repunching.end(),
-                               deleted.begin(), deleted.end());
+                               dead_blocks.begin(), dead_blocks.end());
       }
 
       local_report.stats.lbm_full_container_count++;
@@ -1869,7 +1926,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       // memory in a local and add it to the mem-tracker in a single increment
       // at the end of this loop.
       int64_t mem_usage = 0;
-      for (const UntrackedBlockMap::value_type& e : blocks_in_container) {
+      for (const UntrackedBlockMap::value_type& e : live_blocks) {
         if (!AddLogBlockUnlocked(e.second)) {
           // TODO(adar): track as an inconsistency?
           LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
@@ -1899,58 +1956,6 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   *result_status = Status::OK();
 }
 
-void LogBlockManager::ProcessBlockRecord(
-    const BlockRecordPB& record,
-    FsReport* report,
-    LogBlockContainer* container,
-    UntrackedBlockMap* block_map,
-    vector<scoped_refptr<internal::LogBlock>>* deleted) {
-  BlockId block_id(BlockId::FromPB(record.block_id()));
-  scoped_refptr<LogBlock> lb;
-  switch (record.op_type()) {
-    case CREATE: {
-      lb = new LogBlock(container, block_id, record.offset(), record.length());
-      if (!InsertIfNotPresent(block_map, block_id, lb)) {
-        // TODO(adar): treat as a different kind of inconsistency?
-        report->malformed_record_check->entries.emplace_back(
-            container->ToString(), record);
-        return;
-      }
-
-      VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
-                            block_id.ToString(),
-                            record.offset(), record.length());
-
-      // This block must be included in the container's logical size, even if
-      // it has since been deleted. This helps satisfy one of our invariants:
-      // once a container byte range has been used, it may never be reused in
-      // the future.
-      //
-      // If we ignored deleted blocks, we would end up reusing the space
-      // belonging to the last deleted block in the container.
-      container->BlockCreated(record.offset(), record.length());
-      break;
-    }
-    case DELETE:
-      lb = EraseKeyReturnValuePtr(block_map, block_id);
-      if (!lb) {
-        // TODO(adar): treat as a different kind of inconsistency?
-        report->malformed_record_check->entries.emplace_back(
-            container->ToString(), record);
-        return;
-      }
-      VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
-      container->BlockDeleted(lb->offset(), lb->length());
-      deleted->emplace_back(std::move(lb));
-      break;
-    default:
-      // TODO(adar): treat as a different kind of inconsistency?
-      report->malformed_record_check->entries.emplace_back(
-          container->ToString(), record);
-      return;
-  }
-}
-
 Status LogBlockManager::Repair(
     FsReport* report,
     vector<scoped_refptr<internal::LogBlock>> need_repunching) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/72e58eca/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index d69aeaa..e401446 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -238,15 +238,18 @@ class LogBlockManager : public BlockManager {
 
   // Adds a LogBlock to in-memory data structures.
   //
-  // Returns success if the LogBlock was successfully added, failure if it
-  // was already present.
-  bool AddLogBlock(internal::LogBlockContainer* container,
-                   const BlockId& block_id,
-                   int64_t offset,
-                   int64_t length);
+  // Returns the created LogBlock if it was successfully added or nullptr if a
+  // block with that ID was already present.
+  scoped_refptr<internal::LogBlock> AddLogBlock(
+      internal::LogBlockContainer* container,
+      const BlockId& block_id,
+      int64_t offset,
+      int64_t length);
 
   // Unlocked variant of AddLogBlock() for an already-constructed LogBlock object.
   // Must hold 'lock_'.
+  //
+  // Returns true if the LogBlock was successfully added, false if it was already present.
   bool AddLogBlockUnlocked(const scoped_refptr<internal::LogBlock>& lb);
 
   // Removes a LogBlock from in-memory data structures.
@@ -255,17 +258,6 @@ class LogBlockManager : public BlockManager {
   // already gone.
   scoped_refptr<internal::LogBlock> RemoveLogBlock(const BlockId& block_id);
 
-  // Parses a block record, adding or removing it in 'block_map', and
-  // accounting for it in the metadata for 'container'. Deleted blocks are
-  // written to 'deleted'.
-  //
-  // If any record is malformed, it is written to 'report'.
-  void ProcessBlockRecord(const BlockRecordPB& record,
-                          FsReport* report,
-                          internal::LogBlockContainer* container,
-                          UntrackedBlockMap* block_map,
-                          std::vector<scoped_refptr<internal::LogBlock>>* deleted);
-
   // Repairs any inconsistencies described in 'report'. Any blocks in
   // 'need_repunching' will be punched out again.
   //