You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/01/09 06:22:08 UTC

[4/4] kudu git commit: [fs]: wrapping up containers in scoped_refptr

[fs]: wrapping up containers in scoped_refptr

It's necessary to wrap up containers in scoped_refptr
to support deleting the full containers that are dead
at runtime. Based on this, the KUDU-2636 can be fixed.

Change-Id: I3c5c620014782b3d57dcbe047d0df73c949590c7
Reviewed-on: http://gerrit.cloudera.org:8080/12121
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b63d9219
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b63d9219
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b63d9219

Branch: refs/heads/master
Commit: b63d9219eb6c20d76f61f072af958940891b5c48
Parents: 1147921
Author: helifu <hz...@corp.netease.com>
Authored: Sat Dec 22 22:03:20 2018 +0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Jan 9 06:21:36 2019 +0000

----------------------------------------------------------------------
 src/kudu/fs/log_block_manager-test.cc |   6 +-
 src/kudu/fs/log_block_manager.cc      | 224 ++++++++++++++---------------
 src/kudu/fs/log_block_manager.h       |  40 +++---
 3 files changed, 129 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b63d9219/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 8c755f0..be997c3 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -615,7 +615,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   // Start corrupting the metadata file in different ways.
 
   string path = LogBlockManager::ContainerPathForTests(
-      bm_->all_containers_by_name_.begin()->second);
+      bm_->all_containers_by_name_.begin()->second.get());
   string metadata_path = path + LogBlockManager::kContainerMetadataFileSuffix;
   string data_path = path + LogBlockManager::kContainerDataFileSuffix;
 
@@ -1704,7 +1704,7 @@ TEST_F(LogBlockManagerTest, TestLIFOContainerSelection) {
   // Create some other blocks, and finalize each block after write.
   // The first available container in the queue will be reused every time.
   internal::LogBlockContainer* container =
-      bm_->available_containers_by_data_dir_.begin()->second.front();
+      bm_->available_containers_by_data_dir_.begin()->second.front().get();
   for (int i = 0; i < 4; i++) {
     unique_ptr<WritableBlock> writer;
     ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
@@ -1713,7 +1713,7 @@ TEST_F(LogBlockManagerTest, TestLIFOContainerSelection) {
     // After finalizing the written block, the used container will be
     // available again and can be reused for the following created block.
     ASSERT_EQ(container,
-              bm_->available_containers_by_data_dir_.begin()->second.front());
+              bm_->available_containers_by_data_dir_.begin()->second.front().get());
     blocks.emplace_back(std::move(writer));
   }
   for (const auto& block : blocks) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/b63d9219/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index f1c66f5..7f1669f 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -49,7 +49,6 @@
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
-#include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/strip.h"
@@ -213,12 +212,12 @@ LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>
 // the simpler RefCounted).
 class LogBlock : public RefCountedThreadSafe<LogBlock> {
  public:
-  LogBlock(LogBlockContainer* container, BlockId block_id, int64_t offset,
+  LogBlock(LogBlockContainerRefPtr container, BlockId block_id, int64_t offset,
            int64_t length);
   ~LogBlock() = default;
 
   const BlockId& block_id() const { return block_id_; }
-  LogBlockContainer* container() const { return container_; }
+  LogBlockContainer* container() const { return container_.get(); }
   int64_t offset() const { return offset_; }
   int64_t length() const { return length_; }
 
@@ -230,8 +229,8 @@ class LogBlock : public RefCountedThreadSafe<LogBlock> {
   void RegisterDeletion(const shared_ptr<LogBlockDeletionTransaction>& transaction);
 
  private:
-  // The owning container. Must outlive the LogBlock.
-  LogBlockContainer* container_;
+  // The owning container.
+  LogBlockContainerRefPtr container_;
 
   // The block identifier.
   const BlockId block_id_;
@@ -258,7 +257,7 @@ class LogBlock : public RefCountedThreadSafe<LogBlock> {
 // persisted.
 class LogWritableBlock : public WritableBlock {
  public:
-  LogWritableBlock(LogBlockContainer* container, BlockId block_id,
+  LogWritableBlock(LogBlockContainerRefPtr container, BlockId block_id,
                    int64_t block_offset);
 
   virtual ~LogWritableBlock();
@@ -296,11 +295,11 @@ class LogWritableBlock : public WritableBlock {
   // Does not synchronize the written data; that takes place in Close().
   Status AppendMetadata();
 
-  LogBlockContainer* container() const { return container_; }
+  LogBlockContainer* container() const { return container_.get(); }
 
  private:
-  // The owning container. Must outlive the block.
-  LogBlockContainer* container_;
+  // The owning container.
+  LogBlockContainerRefPtr container_;
 
   // The block's identifier.
   const BlockId block_id_;
@@ -331,7 +330,10 @@ class LogWritableBlock : public WritableBlock {
 // is logically thread unsafe (i.e. multiple clients calling WriteData()
 // concurrently will produce nonsensical container data). Thread unsafe
 // functions are marked explicitly.
-class LogBlockContainer {
+//
+// Block containers are reference counted so that they can be safely removed
+// despite concurrent access.
+class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
  public:
   enum SyncMode {
     SYNC,
@@ -343,7 +345,7 @@ class LogBlockContainer {
   // Creates a new block container in 'dir'.
   static Status Create(LogBlockManager* block_manager,
                        DataDir* dir,
-                       unique_ptr<LogBlockContainer>* container);
+                       LogBlockContainerRefPtr* container);
 
   // Opens an existing block container in 'dir'.
   //
@@ -357,8 +359,8 @@ class LogBlockContainer {
   static Status Open(LogBlockManager* block_manager,
                      DataDir* dir,
                      FsReport* report,
-                     const std::string& id,
-                     unique_ptr<LogBlockContainer>* container);
+                     const string& id,
+                     LogBlockContainerRefPtr* container);
 
   // Closes a set of blocks belonging to this container, possibly synchronizing
   // the dirty data and metadata to disk.
@@ -455,11 +457,11 @@ class LogBlockContainer {
       FsReport* report,
       LogBlockManager::UntrackedBlockMap* live_blocks,
       LogBlockManager::BlockRecordMap* live_block_records,
-      std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+      vector<LogBlockRefPtr>* dead_blocks,
       uint64_t* max_block_id);
 
   // Updates internal bookkeeping state to reflect the creation of a block.
-  void BlockCreated(const scoped_refptr<LogBlock>& block);
+  void BlockCreated(const LogBlockRefPtr& block);
 
   // Updates internal bookkeeping state to reflect the deletion of a block.
   //
@@ -467,7 +469,7 @@ class LogBlockContainer {
   // with creations.
   //
   // Note: the container is not made "unfull"; containers remain sparse until deleted.
-  void BlockDeleted(const scoped_refptr<LogBlock>& block);
+  void BlockDeleted(const LogBlockRefPtr& block);
 
   // Finalizes a fully written block. It updates the container data file's position,
   // truncates the container if full and marks the container as available.
@@ -540,7 +542,7 @@ class LogBlockContainer {
       FsReport* report,
       LogBlockManager::UntrackedBlockMap* live_blocks,
       LogBlockManager::BlockRecordMap* live_block_records,
-      std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+      vector<LogBlockRefPtr>* dead_blocks,
       uint64_t* data_file_size,
       uint64_t* max_block_id);
 
@@ -632,7 +634,7 @@ void LogBlockContainer::HandleError(const Status& s) const {
 
 Status LogBlockContainer::Create(LogBlockManager* block_manager,
                                  DataDir* dir,
-                                 unique_ptr<LogBlockContainer>* container) {
+                                 LogBlockContainerRefPtr* container) {
   string common_path;
   string metadata_path;
   string data_path;
@@ -702,7 +704,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
                                DataDir* dir,
                                FsReport* report,
                                const string& id,
-                               unique_ptr<LogBlockContainer>* container) {
+                               LogBlockContainerRefPtr* container) {
   Env* env = block_manager->env();
   string common_path = JoinPathSegments(dir->dir(), id);
   string metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
@@ -751,10 +753,10 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
   RETURN_NOT_OK_CONTAINER_DISK_FAILURE(data_file->Size(&data_file_size));
 
   // Create the in-memory container and populate it.
-  unique_ptr<LogBlockContainer> open_container(new LogBlockContainer(block_manager,
-                                                                     dir,
-                                                                     std::move(metadata_pb_writer),
-                                                                     std::move(data_file)));
+  LogBlockContainerRefPtr open_container(new LogBlockContainer(block_manager,
+                                                               dir,
+                                                               std::move(metadata_pb_writer),
+                                                               std::move(data_file)));
   open_container->preallocated_offset_ = data_file_size;
   VLOG(1) << "Opened log block container " << open_container->ToString();
   container->swap(open_container);
@@ -776,7 +778,7 @@ Status LogBlockContainer::ProcessRecords(
     FsReport* report,
     LogBlockManager::UntrackedBlockMap* live_blocks,
     LogBlockManager::BlockRecordMap* live_block_records,
-    vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+    vector<LogBlockRefPtr>* dead_blocks,
     uint64_t* max_block_id) {
   string metadata_path = metadata_file_->filename();
   unique_ptr<RandomAccessFile> metadata_reader;
@@ -822,11 +824,11 @@ Status LogBlockContainer::ProcessRecord(
     FsReport* report,
     LogBlockManager::UntrackedBlockMap* live_blocks,
     LogBlockManager::BlockRecordMap* live_block_records,
-    vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+    vector<LogBlockRefPtr>* dead_blocks,
     uint64_t* data_file_size,
     uint64_t* max_block_id) {
   const BlockId block_id(BlockId::FromPB(record->block_id()));
-  scoped_refptr<LogBlock> lb;
+  LogBlockRefPtr lb;
   switch (record->op_type()) {
     case CREATE:
       // First verify that the record's offset/length aren't wildly incorrect.
@@ -1119,7 +1121,7 @@ void LogBlockContainer::UpdateNextBlockOffset(int64_t block_offset, int64_t bloc
   }
 }
 
-void LogBlockContainer::BlockCreated(const scoped_refptr<LogBlock>& block) {
+void LogBlockContainer::BlockCreated(const LogBlockRefPtr& block) {
   DCHECK_GE(block->offset(), 0);
 
   total_bytes_.IncrementBy(block->fs_aligned_length());
@@ -1129,7 +1131,7 @@ void LogBlockContainer::BlockCreated(const scoped_refptr<LogBlock>& block) {
   live_blocks_.Increment();
 }
 
-void LogBlockContainer::BlockDeleted(const scoped_refptr<LogBlock>& block) {
+void LogBlockContainer::BlockDeleted(const LogBlockRefPtr& block) {
   DCHECK_GE(block->offset(), 0);
 
   live_bytes_.IncrementBy(-block->length());
@@ -1174,16 +1176,16 @@ class LogBlockCreationTransaction : public BlockCreationTransaction {
 
   virtual ~LogBlockCreationTransaction() = default;
 
-  virtual void AddCreatedBlock(std::unique_ptr<WritableBlock> block) override;
+  virtual void AddCreatedBlock(unique_ptr<WritableBlock> block) override;
 
   virtual Status CommitCreatedBlocks() override;
 
  private:
-  std::vector<std::unique_ptr<LogWritableBlock>> created_blocks_;
+  vector<unique_ptr<LogWritableBlock>> created_blocks_;
 };
 
 void LogBlockCreationTransaction::AddCreatedBlock(
-    std::unique_ptr<WritableBlock> block) {
+    unique_ptr<WritableBlock> block) {
   LogWritableBlock* lwb = down_cast<LogWritableBlock*>(block.release());
   created_blocks_.emplace_back(unique_ptr<LogWritableBlock>(lwb));
 }
@@ -1234,23 +1236,25 @@ class LogBlockDeletionTransaction : public BlockDeletionTransaction,
 
   virtual void AddDeletedBlock(BlockId block) override;
 
-  virtual Status CommitDeletedBlocks(std::vector<BlockId>* deleted) override;
+  virtual Status CommitDeletedBlocks(vector<BlockId>* deleted) override;
 
   // Add the given block that needs to be deleted to 'deleted_interval_map_',
   // which keeps track of container and the range to be hole punched.
-  void AddBlock(const scoped_refptr<internal::LogBlock>& lb);
+  void AddBlock(const LogBlockRefPtr& lb);
 
  private:
   // Block <offset, offset + length> pair.
   typedef std::pair<int64_t, int64_t> BlockInterval;
 
   // Map used to aggregate BlockInterval instances across containers.
-  std::unordered_map<internal::LogBlockContainer*,
-      std::vector<BlockInterval>> deleted_interval_map_;
+  unordered_map<LogBlockContainerRefPtr,
+      vector<BlockInterval>,
+      ScopedRefPtrHashFunctor<LogBlockContainer>,
+      ScopedRefPtrEqualToFunctor<LogBlockContainer>> deleted_interval_map_;
 
   // The owning LogBlockManager. Must outlive the LogBlockDeletionTransaction.
   LogBlockManager* lbm_;
-  std::vector<BlockId> deleted_blocks_;
+  vector<BlockId> deleted_blocks_;
   DISALLOW_COPY_AND_ASSIGN(LogBlockDeletionTransaction);
 };
 
@@ -1260,25 +1264,25 @@ void LogBlockDeletionTransaction::AddDeletedBlock(BlockId block) {
 
 LogBlockDeletionTransaction::~LogBlockDeletionTransaction() {
   for (auto& entry : deleted_interval_map_) {
-    LogBlockContainer* container = entry.first;
+    LogBlockContainer* container = entry.first.get();
     CHECK_OK_PREPEND(CoalesceIntervals<int64_t>(&entry.second),
                      Substitute("could not coalesce hole punching for container: $0",
                                 container->ToString()));
 
     for (const auto& interval : entry.second) {
       container->ExecClosure(Bind(&LogBlockContainer::ContainerDeletionAsync,
-                                  Unretained(container),
+                                  container,
                                   interval.first,
                                   interval.second - interval.first));
     }
   }
 }
 
-Status LogBlockDeletionTransaction::CommitDeletedBlocks(std::vector<BlockId>* deleted) {
+Status LogBlockDeletionTransaction::CommitDeletedBlocks(vector<BlockId>* deleted) {
   deleted->clear();
   shared_ptr<LogBlockDeletionTransaction> transaction = shared_from_this();
 
-  vector<scoped_refptr<LogBlock>> log_blocks;
+  vector<LogBlockRefPtr> log_blocks;
   Status first_failure = lbm_->RemoveLogBlocks(deleted_blocks_, &log_blocks, deleted);
   for (const auto& lb : log_blocks) {
     // Register the block to be hole punched if metadata recording
@@ -1299,7 +1303,7 @@ Status LogBlockDeletionTransaction::CommitDeletedBlocks(std::vector<BlockId>* de
   return first_failure;
 }
 
-void LogBlockDeletionTransaction::AddBlock(const scoped_refptr<internal::LogBlock>& lb) {
+void LogBlockDeletionTransaction::AddBlock(const LogBlockRefPtr& lb) {
   DCHECK_GE(lb->fs_aligned_length(), 0);
 
   BlockInterval block_interval(lb->offset(),
@@ -1311,9 +1315,9 @@ void LogBlockDeletionTransaction::AddBlock(const scoped_refptr<internal::LogBloc
 // LogBlock (definition)
 ////////////////////////////////////////////////////////////
 
-LogBlock::LogBlock(LogBlockContainer* container, BlockId block_id,
+LogBlock::LogBlock(LogBlockContainerRefPtr container, BlockId block_id,
                    int64_t offset, int64_t length)
-    : container_(container),
+    : container_(std::move(container)),
       block_id_(block_id),
       offset_(offset),
       length_(length) {
@@ -1351,18 +1355,18 @@ void LogBlock::RegisterDeletion(
 // LogWritableBlock (definition)
 ////////////////////////////////////////////////////////////
 
-LogWritableBlock::LogWritableBlock(LogBlockContainer* container,
+LogWritableBlock::LogWritableBlock(LogBlockContainerRefPtr container,
                                    BlockId block_id, int64_t block_offset)
-    : container_(container),
+    : container_(std::move(container)),
       block_id_(block_id),
       block_offset_(block_offset),
       block_length_(0),
       state_(CLEAN) {
   DCHECK_GE(block_offset, 0);
-  DCHECK_EQ(0, block_offset % container->instance()->filesystem_block_size_bytes());
-  if (container->metrics()) {
-    container->metrics()->generic_metrics.blocks_open_writing->Increment();
-    container->metrics()->generic_metrics.total_writable_blocks->Increment();
+  DCHECK_EQ(0, block_offset % container_->instance()->filesystem_block_size_bytes());
+  if (container_->metrics()) {
+    container_->metrics()->generic_metrics.blocks_open_writing->Increment();
+    container_->metrics()->generic_metrics.total_writable_blocks->Increment();
   }
 }
 
@@ -1524,7 +1528,7 @@ void LogWritableBlock::DoClose() {
     container_->FinalizeBlock(block_offset_, block_length_);
   }
 
-  scoped_refptr<LogBlock> lb = container_->block_manager()->AddLogBlock(
+  LogBlockRefPtr lb = container_->block_manager()->AddLogBlock(
       container_, block_id_, block_offset_, block_length_);
   CHECK(lb);
   container_->BlockCreated(lb);
@@ -1550,8 +1554,7 @@ Status LogWritableBlock::AppendMetadata() {
 // Refers to a LogBlock representing the block's persisted metadata.
 class LogReadableBlock : public ReadableBlock {
  public:
-  LogReadableBlock(LogBlockContainer* container,
-                   scoped_refptr<LogBlock> log_block);
+  explicit LogReadableBlock(LogBlockRefPtr log_block);
 
   virtual ~LogReadableBlock();
 
@@ -1570,11 +1573,8 @@ class LogReadableBlock : public ReadableBlock {
   virtual size_t memory_footprint() const OVERRIDE;
 
  private:
-  // The owning container. Must outlive this block.
-  LogBlockContainer* container_;
-
   // A reference to this block's metadata.
-  scoped_refptr<internal::LogBlock> log_block_;
+  LogBlockRefPtr log_block_;
 
   // Whether or not this block has been closed. Close() is thread-safe, so
   // this must be an atomic primitive.
@@ -1583,14 +1583,12 @@ class LogReadableBlock : public ReadableBlock {
   DISALLOW_COPY_AND_ASSIGN(LogReadableBlock);
 };
 
-LogReadableBlock::LogReadableBlock(LogBlockContainer* container,
-                                   scoped_refptr<LogBlock> log_block)
-  : container_(container),
-    log_block_(std::move(log_block)),
+LogReadableBlock::LogReadableBlock(LogBlockRefPtr log_block)
+  : log_block_(std::move(log_block)),
     closed_(false) {
-  if (container_->metrics()) {
-    container_->metrics()->generic_metrics.blocks_open_reading->Increment();
-    container_->metrics()->generic_metrics.total_readable_blocks->Increment();
+  if (log_block_->container()->metrics()) {
+    log_block_->container()->metrics()->generic_metrics.blocks_open_reading->Increment();
+    log_block_->container()->metrics()->generic_metrics.total_readable_blocks->Increment();
   }
 }
 
@@ -1601,17 +1599,17 @@ LogReadableBlock::~LogReadableBlock() {
 
 Status LogReadableBlock::Close() {
   if (closed_.CompareAndSet(false, true)) {
-    log_block_.reset();
-    if (container_->metrics()) {
-      container_->metrics()->generic_metrics.blocks_open_reading->Decrement();
+    if (log_block_->container()->metrics()) {
+      log_block_->container()->metrics()->generic_metrics.blocks_open_reading->Decrement();
     }
+    log_block_.reset();
   }
 
   return Status::OK();
 }
 
 BlockManager* LogReadableBlock::block_manager() const {
-  return container_->block_manager();
+  return log_block_->container()->block_manager();
 }
 
 const BlockId& LogReadableBlock::id() const {
@@ -1648,7 +1646,7 @@ Status LogReadableBlock::ReadV(uint64_t offset, ArrayView<Slice> results) const
   }
 
   MicrosecondsInt64 start_time = GetMonoTimeMicros();
-  RETURN_NOT_OK(container_->ReadVData(read_offset, results));
+  RETURN_NOT_OK(log_block_->container()->ReadVData(read_offset, results));
   MicrosecondsInt64 end_time = GetMonoTimeMicros();
 
   int64_t dur = end_time - start_time;
@@ -1657,8 +1655,8 @@ Status LogReadableBlock::ReadV(uint64_t offset, ArrayView<Slice> results) const
   const char* counter = BUCKETED_COUNTER_NAME("lbm_reads", dur);
   TRACE_COUNTER_INCREMENT(counter, 1);
 
-  if (container_->metrics()) {
-    container_->metrics()->generic_metrics.total_bytes_read->IncrementBy(read_length);
+  if (log_block_->container()->metrics()) {
+    log_block_->container()->metrics()->generic_metrics.total_bytes_read->IncrementBy(read_length);
   }
   return Status::OK();
 }
@@ -1736,8 +1734,6 @@ LogBlockManager::~LogBlockManager() {
   // Containers may have outstanding tasks running on data directories; wait
   // for them to complete before destroying the containers.
   dd_manager_->WaitOnClosures();
-
-  STLDeleteValues(&all_containers_by_name_);
 }
 
 Status LogBlockManager::Open(FsReport* report) {
@@ -1848,7 +1844,7 @@ Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts,
   //
   // TODO(unknown): should we cap the number of outstanding containers and
   // force callers to block if we've reached it?
-  LogBlockContainer* container;
+  LogBlockContainerRefPtr container;
   RETURN_NOT_OK(GetOrCreateContainer(opts, &container));
 
   // Generate a free block ID.
@@ -1869,7 +1865,7 @@ Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts,
 
 Status LogBlockManager::OpenBlock(const BlockId& block_id,
                                   unique_ptr<ReadableBlock>* block) {
-  scoped_refptr<LogBlock> lb;
+  LogBlockRefPtr lb;
   {
     std::lock_guard<simple_spinlock> l(lock_);
     lb = FindPtrOrNull(blocks_by_block_id_, block_id);
@@ -1878,10 +1874,9 @@ Status LogBlockManager::OpenBlock(const BlockId& block_id,
     return Status::NotFound("Can't find block", block_id.ToString());
   }
 
-  block->reset(new internal::LogReadableBlock(lb->container(),
-                                              lb.get()));
   VLOG(3) << "Opened block " << (*block)->id()
           << " from container " << lb->container()->ToString();
+  block->reset(new internal::LogReadableBlock(std::move(lb)));
   return Status::OK();
 }
 
@@ -1907,7 +1902,7 @@ void LogBlockManager::NotifyBlockId(BlockId block_id) {
   next_block_id_.StoreMax(block_id.id() + 1);
 }
 
-void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container) {
+void LogBlockManager::AddNewContainerUnlocked(const LogBlockContainerRefPtr& container) {
   DCHECK(lock_.is_locked());
   InsertOrDie(&all_containers_by_name_, container->ToString(), container);
   if (metrics()) {
@@ -1920,7 +1915,7 @@ void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container) {
 
 void LogBlockManager::RemoveFullContainerUnlocked(const string& container_name) {
   DCHECK(lock_.is_locked());
-  unique_ptr<LogBlockContainer> to_delete(EraseKeyReturnValuePtr(
+  LogBlockContainerRefPtr to_delete(EraseKeyReturnValuePtr(
       &all_containers_by_name_, container_name));
   CHECK(to_delete);
   CHECK(to_delete->full())
@@ -1932,7 +1927,7 @@ void LogBlockManager::RemoveFullContainerUnlocked(const string& container_name)
 }
 
 Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
-                                             LogBlockContainer** container) {
+                                             LogBlockContainerRefPtr* container) {
   DataDir* dir;
   RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
       error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, opts.tablet_id));
@@ -1948,7 +1943,7 @@ Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
   }
 
   // All containers are in use; create a new one.
-  unique_ptr<LogBlockContainer> new_container;
+  LogBlockContainerRefPtr new_container;
   Status s = LogBlockContainer::Create(this, dir, &new_container);
 
   // We could create a container in a different directory, but there's
@@ -1959,24 +1954,24 @@ Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
   {
     std::lock_guard<simple_spinlock> l(lock_);
     dirty_dirs_.insert(dir->dir());
-    AddNewContainerUnlocked(new_container.get());
+    AddNewContainerUnlocked(new_container);
   }
-  *container = new_container.release();
+  *container = std::move(new_container);
   return Status::OK();
 }
 
-void LogBlockManager::MakeContainerAvailable(LogBlockContainer* container) {
+void LogBlockManager::MakeContainerAvailable(LogBlockContainerRefPtr container) {
   std::lock_guard<simple_spinlock> l(lock_);
-  MakeContainerAvailableUnlocked(container);
+  MakeContainerAvailableUnlocked(std::move(container));
 }
 
-void LogBlockManager::MakeContainerAvailableUnlocked(LogBlockContainer* container) {
+void LogBlockManager::MakeContainerAvailableUnlocked(LogBlockContainerRefPtr container) {
   DCHECK(lock_.is_locked());
   if (container->full() || container->read_only()) {
     return;
   }
   VLOG(3) << Substitute("container $0 being made available", container->ToString());
-  available_containers_by_data_dir_[container->data_dir()].push_front(container);
+  available_containers_by_data_dir_[container->data_dir()].push_front(std::move(container));
 }
 
 Status LogBlockManager::SyncContainer(const LogBlockContainer& container) {
@@ -2019,13 +2014,13 @@ bool LogBlockManager::TryUseBlockId(const BlockId& block_id) {
   return InsertIfNotPresent(&open_block_ids_, block_id);
 }
 
-scoped_refptr<LogBlock> LogBlockManager::AddLogBlock(
-    LogBlockContainer* container,
+LogBlockRefPtr LogBlockManager::AddLogBlock(
+    LogBlockContainerRefPtr container,
     const BlockId& block_id,
     int64_t offset,
     int64_t length) {
   std::lock_guard<simple_spinlock> l(lock_);
-  scoped_refptr<LogBlock> lb(new LogBlock(container, block_id, offset, length));
+  LogBlockRefPtr lb(new LogBlock(std::move(container), block_id, offset, length));
   mem_tracker_->Consume(kudu_malloc_usable_size(lb.get()));
 
   if (AddLogBlockUnlocked(lb)) {
@@ -2034,13 +2029,13 @@ scoped_refptr<LogBlock> LogBlockManager::AddLogBlock(
   return nullptr;
 }
 
-bool LogBlockManager::AddLogBlockUnlocked(scoped_refptr<LogBlock> lb) {
+bool LogBlockManager::AddLogBlockUnlocked(LogBlockRefPtr lb) {
   DCHECK(lock_.is_locked());
 
   // InsertIfNotPresent doesn't use move semantics, so instead we just
   // insert an empty scoped_refptr and assign into it down below rather
   // than using the utility function.
-  scoped_refptr<LogBlock>* entry_ptr = &blocks_by_block_id_[lb->block_id()];
+  LogBlockRefPtr* entry_ptr = &blocks_by_block_id_[lb->block_id()];
   if (*entry_ptr) {
     // Already have an entry for this block ID.
     return false;
@@ -2062,15 +2057,15 @@ bool LogBlockManager::AddLogBlockUnlocked(scoped_refptr<LogBlock> lb) {
 }
 
 Status LogBlockManager::RemoveLogBlocks(vector<BlockId> block_ids,
-                                        vector<scoped_refptr<LogBlock>>* log_blocks,
+                                        vector<LogBlockRefPtr>* log_blocks,
                                         vector<BlockId>* deleted) {
   Status first_failure;
-  vector<scoped_refptr<LogBlock>> lbs;
+  vector<LogBlockRefPtr> lbs;
   int64_t malloc_space = 0, blocks_length = 0;
   {
     std::lock_guard<simple_spinlock> l(lock_);
     for (const auto& block_id : block_ids) {
-      scoped_refptr<LogBlock> lb;
+      LogBlockRefPtr lb;
       Status s = RemoveLogBlockUnlocked(block_id, &lb);
       // If we get NotFound, then the block was already deleted.
       if (!s.ok() && !s.IsNotFound()) {
@@ -2127,7 +2122,7 @@ Status LogBlockManager::RemoveLogBlocks(vector<BlockId> block_ids,
 }
 
 Status LogBlockManager::RemoveLogBlockUnlocked(const BlockId& block_id,
-                                               scoped_refptr<internal::LogBlock>* lb) {
+                                               LogBlockRefPtr* lb) {
   auto it = blocks_by_block_id_.find(block_id);
   if (it == blocks_by_block_id_.end()) {
     return Status::NotFound("Can't find block", block_id.ToString());
@@ -2174,7 +2169,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
 
   // Keep track of deleted blocks whose space hasn't been punched; they will
   // be repunched during repair.
-  vector<scoped_refptr<internal::LogBlock>> need_repunching;
+  vector<LogBlockRefPtr> need_repunching;
 
   // Keep track of containers that have nothing but dead blocks; they will be
   // deleted during repair.
@@ -2208,7 +2203,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       continue;
     }
 
-    unique_ptr<LogBlockContainer> container;
+    LogBlockContainerRefPtr container;
     s = LogBlockContainer::Open(
         this, dir, &local_report, container_name, &container);
     if (s.IsAborted()) {
@@ -2237,7 +2232,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     // the above issue.
     UntrackedBlockMap live_blocks;
     BlockRecordMap live_block_records;
-    vector<scoped_refptr<internal::LogBlock>> dead_blocks;
+    vector<LogBlockRefPtr> dead_blocks;
     uint64_t max_block_id = 0;
     s = container->ProcessRecords(&local_report,
                                   &live_blocks,
@@ -2401,8 +2396,8 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       }
 
       mem_tracker_->Consume(mem_usage);
-      AddNewContainerUnlocked(container.get());
-      MakeContainerAvailableUnlocked(container.release());
+      AddNewContainerUnlocked(container);
+      MakeContainerAvailableUnlocked(std::move(container));
     }
   }
 
@@ -2441,7 +2436,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
 Status LogBlockManager::Repair(
     DataDir* dir,
     FsReport* report,
-    vector<scoped_refptr<internal::LogBlock>> need_repunching,
+    vector<LogBlockRefPtr> need_repunching,
     vector<string> dead_containers,
     unordered_map<string, vector<BlockRecordPB>> low_live_block_containers) {
   if (opts_.read_only) {
@@ -2456,7 +2451,7 @@ Status LogBlockManager::Repair(
   // From here on out we're committed to repairing.
 
   // Fetch all the containers we're going to need.
-  unordered_map<std::string, internal::LogBlockContainer*> containers_by_name;
+  unordered_map<string, LogBlockContainerRefPtr> containers_by_name;
   {
     std::lock_guard<simple_spinlock> l(lock_);
 
@@ -2469,8 +2464,8 @@ Status LogBlockManager::Repair(
     // Fetch all the containers we're going to need.
     if (report->partial_record_check) {
       for (const auto& pr : report->partial_record_check->entries) {
-        LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
-                                             pr.container);
+        LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_,
+                                                  pr.container);
         if (c) {
           containers_by_name[pr.container] = c;
         }
@@ -2478,16 +2473,16 @@ Status LogBlockManager::Repair(
     }
     if (report->full_container_space_check) {
       for (const auto& fcp : report->full_container_space_check->entries) {
-        LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
-                                             fcp.container);
+        LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_,
+                                                  fcp.container);
         if (c) {
           containers_by_name[fcp.container] = c;
         }
       }
     }
     for (const auto& e : low_live_block_containers) {
-      LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
-                                           e.first);
+      LogBlockContainerRefPtr c = FindPtrOrNull(all_containers_by_name_,
+                                                e.first);
       if (c) {
         containers_by_name[e.first] = c;
       }
@@ -2538,8 +2533,7 @@ Status LogBlockManager::Repair(
       unique_ptr<RWFile> file;
       RWFileOptions opts;
       opts.mode = Env::OPEN_EXISTING;
-      internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
-                                                             pr.container);
+      LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, pr.container);
       if (!container) {
         // The container was deleted outright.
         pr.repaired = true;
@@ -2594,8 +2588,7 @@ Status LogBlockManager::Repair(
   // disk space consumption.
   if (report->full_container_space_check) {
     for (auto& fcp : report->full_container_space_check->entries) {
-      internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
-                                                             fcp.container);
+      LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, fcp.container);
       if (!container) {
         // The container was deleted outright.
         fcp.repaired = true;
@@ -2631,8 +2624,7 @@ Status LogBlockManager::Repair(
   int64_t metadata_files_compacted = 0;
   int64_t metadata_bytes_delta = 0;
   for (const auto& e : low_live_block_containers) {
-    internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
-                                                           e.first);
+    LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, e.first);
     if (!container) {
       // The container was deleted outright.
       continue;
@@ -2641,7 +2633,7 @@ Status LogBlockManager::Repair(
     // Rewrite this metadata file. Failures are non-fatal.
     int64_t file_bytes_delta;
     const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix);
-    Status s = RewriteMetadataFile(*container, e.second, &file_bytes_delta);
+    Status s = RewriteMetadataFile(*(container.get()), e.second, &file_bytes_delta);
     if (!s.ok()) {
       WARN_NOT_OK(s, "could not rewrite metadata file");
       continue;
@@ -2731,7 +2723,7 @@ Status LogBlockManager::RewriteMetadataFile(const LogBlockContainer& container,
   return Status::OK();
 }
 
-std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer* container) {
+string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer* container) {
   return container->ToString();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/b63d9219/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index edcec5c..4ca3ecc 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -63,6 +63,9 @@ class LogWritableBlock;
 struct LogBlockManagerMetrics;
 } // namespace internal
 
+typedef scoped_refptr<internal::LogBlock> LogBlockRefPtr;
+typedef scoped_refptr<internal::LogBlockContainer> LogBlockContainerRefPtr;
+
 // A log-backed (i.e. sequentially allocated file) block storage
 // implementation.
 //
@@ -229,10 +232,10 @@ class LogBlockManager : public BlockManager {
   // Type for the actual block map used to store all live blocks.
   // We use sparse_hash_map<> here to reduce memory overhead.
   typedef MemTrackerAllocator<
-      std::pair<const BlockId, scoped_refptr<internal::LogBlock>>> BlockAllocator;
+      std::pair<const BlockId, LogBlockRefPtr>> BlockAllocator;
   typedef spp::sparse_hash_map<
       BlockId,
-      scoped_refptr<internal::LogBlock>,
+      LogBlockRefPtr,
       BlockIdHash,
       BlockIdEqual,
       BlockAllocator> BlockMap;
@@ -242,7 +245,7 @@ class LogBlockManager : public BlockManager {
   // Only used during startup.
   typedef std::unordered_map<
       const BlockId,
-      scoped_refptr<internal::LogBlock>,
+      LogBlockRefPtr,
       BlockIdHash,
       BlockIdEqual> UntrackedBlockMap;
 
@@ -255,17 +258,10 @@ class LogBlockManager : public BlockManager {
       BlockIdHash,
       BlockIdEqual> BlockRecordMap;
 
-  // Map used to aggregate BlockRecordMap instances across containers.
-  //
-  // Only used during startup.
-  typedef std::unordered_map<
-      std::string,
-      std::vector<BlockRecordPB>> BlockRecordsByContainerMap;
-
   // Adds an as of yet unseen container to this block manager.
   //
   // Must be called with 'lock_' held.
-  void AddNewContainerUnlocked(internal::LogBlockContainer* container);
+  void AddNewContainerUnlocked(const LogBlockContainerRefPtr& container);
 
   // Removes a previously added container from this block manager. The
   // container must be full.
@@ -280,12 +276,12 @@ class LogBlockManager : public BlockManager {
   // writing is finished, call MakeContainerAvailable() to make it
   // available to other writers.
   Status GetOrCreateContainer(const CreateBlockOptions& opts,
-                              internal::LogBlockContainer** container);
+                              LogBlockContainerRefPtr* container);
 
   // Indicate that this container is no longer in use and can be handed out
   // to other writers.
-  void MakeContainerAvailable(internal::LogBlockContainer* container);
-  void MakeContainerAvailableUnlocked(internal::LogBlockContainer* container);
+  void MakeContainerAvailable(LogBlockContainerRefPtr container);
+  void MakeContainerAvailableUnlocked(LogBlockContainerRefPtr container);
 
   // Synchronizes a container's dirty metadata to disk, taking care not to
   // sync more than is necessary (using 'dirty_dirs_').
@@ -301,8 +297,8 @@ class LogBlockManager : public BlockManager {
   //
   // Returns the created LogBlock if it was successfully added or nullptr if a
   // block with that ID was already present.
-  scoped_refptr<internal::LogBlock> AddLogBlock(
-      internal::LogBlockContainer* container,
+  LogBlockRefPtr AddLogBlock(
+      LogBlockContainerRefPtr container,
       const BlockId& block_id,
       int64_t offset,
       int64_t length);
@@ -311,7 +307,7 @@ class LogBlockManager : public BlockManager {
   // Must hold 'lock_'.
   //
   // Returns true if the LogBlock was successfully added, false if it was already present.
-  bool AddLogBlockUnlocked(scoped_refptr<internal::LogBlock> lb);
+  bool AddLogBlockUnlocked(LogBlockRefPtr lb);
 
   // Removes the given set of LogBlocks from in-memory data structures, and
   // appends the block deletion metadata to record the on-disk deletion.
@@ -321,7 +317,7 @@ class LogBlockManager : public BlockManager {
   //
   // Returns the first deletion failure that was seen, if any.
   Status RemoveLogBlocks(std::vector<BlockId> block_ids,
-                         std::vector<scoped_refptr<internal::LogBlock>>* log_blocks,
+                         std::vector<LogBlockRefPtr>* log_blocks,
                          std::vector<BlockId>* deleted);
 
   // Removes a LogBlock from in-memory data structures. Must hold 'lock_'.
@@ -329,7 +325,7 @@ class LogBlockManager : public BlockManager {
   //
   // Returns an error of LogBlock cannot be successfully removed.
   Status RemoveLogBlockUnlocked(const BlockId& block_id,
-                                scoped_refptr<internal::LogBlock>* lb);
+                                LogBlockRefPtr* lb);
 
   // Repairs any inconsistencies for 'dir' described in 'report'.
   //
@@ -342,7 +338,7 @@ class LogBlockManager : public BlockManager {
   // Returns an error if repairing a fatal inconsistency failed.
   Status Repair(DataDir* dir,
                 FsReport* report,
-                std::vector<scoped_refptr<internal::LogBlock>> need_repunching,
+                std::vector<LogBlockRefPtr> need_repunching,
                 std::vector<std::string> dead_containers,
                 std::unordered_map<
                     std::string,
@@ -434,14 +430,14 @@ class LogBlockManager : public BlockManager {
 
   // Holds (and owns) all containers loaded from disk.
   std::unordered_map<std::string,
-                     internal::LogBlockContainer*> all_containers_by_name_;
+                     LogBlockContainerRefPtr> all_containers_by_name_;
 
   // Holds only those containers that are currently available for writing,
   // excluding containers that are either in use or full.
   //
   // Does not own the containers.
   std::unordered_map<const DataDir*,
-                     std::deque<internal::LogBlockContainer*>> available_containers_by_data_dir_;
+                     std::deque<LogBlockContainerRefPtr>> available_containers_by_data_dir_;
 
   // Tracks dirty container directories.
   //