You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/10/25 18:49:11 UTC

[2/2] kudu git commit: KUDU-2055 [part 5]: Coalesce hole punch for LBM

KUDU-2055 [part 5]: Coalesce hole punch for LBM

This patch extends the implementation of BlockDeletionTransaction to
actually coalesce hole punch in LBM, so that blocks in one container
that are contiguous are grouped together in a hole punch operation.

It also adds a new metric 'holes_punched' in log block manager to track
the number of hole punching operations. And another two metrics
'blocks_created' and 'blocks_deleted' in block manager to track blocks
that were created and deleted since service start respectively.

It updates unit test LogBlockManagerTest.MetricsTest, to verify that
coalescing hole punching works as expected by checking the value of
'holes_punched' metric.

Change-Id: I7ccc9538b8184f8f7ba8f6118713b82fef433275
Reviewed-on: http://gerrit.cloudera.org:8080/8162
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/15f3f9b2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/15f3f9b2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/15f3f9b2

Branch: refs/heads/master
Commit: 15f3f9b2f7421b27dfb2bbdd8f6314939ef7c993
Parents: dfaaf9a
Author: hahao <ha...@cloudera.com>
Authored: Sun Oct 15 23:05:41 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Oct 25 18:48:47 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-test.cc     |  46 +++--
 src/kudu/fs/block_manager_metrics.cc  |  12 ++
 src/kudu/fs/block_manager_metrics.h   |   2 +
 src/kudu/fs/file_block_manager.cc     |   4 +
 src/kudu/fs/log_block_manager-test.cc | 221 +++++++++++++++++----
 src/kudu/fs/log_block_manager.cc      | 306 ++++++++++++++++++-----------
 src/kudu/fs/log_block_manager.h       |  27 ++-
 src/kudu/util/metrics.cc              |   2 +
 src/kudu/util/metrics.h               |   1 +
 9 files changed, 434 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/15f3f9b2/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 68a71df..8ae1c66 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -666,12 +666,14 @@ TYPED_TEST(BlockManagerTest, PersistenceTest) {
   ASSERT_OK(this->bm_->CreateBlock(this->test_block_opts_, &written_block3));
   ASSERT_OK(written_block3->Append(test_data));
   ASSERT_OK(written_block3->Close());
-  shared_ptr<BlockDeletionTransaction> deletion_transaction =
-      this->bm_->NewDeletionTransaction();
-  deletion_transaction->AddDeletedBlock(written_block3->id());
-  vector<BlockId> deleted;
-  ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
-  ASSERT_EQ(1, deleted.size());
+  {
+    shared_ptr<BlockDeletionTransaction> deletion_transaction =
+        this->bm_->NewDeletionTransaction();
+    deletion_transaction->AddDeletedBlock(written_block3->id());
+    vector<BlockId> deleted;
+    ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
+    ASSERT_EQ(1, deleted.size());
+  }
 
   // Reopen the block manager. This may read block metadata from disk.
   //
@@ -956,23 +958,25 @@ TYPED_TEST(BlockManagerTest, TestMetadataOkayDespiteFailure) {
 
     int num_deleted = 0;
     int num_deleted_attempts = 0;
-    shared_ptr<BlockDeletionTransaction> deletion_transaction =
-        this->bm_->NewDeletionTransaction();
-    for (auto it = ids.begin(); it != ids.end();) {
-      // TODO(adar): the lbm removes a block from its block map even if the
-      // on-disk deletion fails. When that's fixed, update this code to
-      // erase() only if s.ok().
-      deletion_transaction->AddDeletedBlock(*it);
-      it = ids.erase(it);
-      num_deleted_attempts++;
-
-      // Skip every other block.
-      if (it != ids.end()) {
-        it++;
+    vector<BlockId> deleted;
+    {
+      shared_ptr<BlockDeletionTransaction> deletion_transaction =
+          this->bm_->NewDeletionTransaction();
+      for (auto it = ids.begin(); it != ids.end();) {
+        // TODO(adar): the lbm removes a block from its block map even if the
+        // on-disk deletion fails. When that's fixed, update this code to
+        // erase() only if s.ok().
+        deletion_transaction->AddDeletedBlock(*it);
+        it = ids.erase(it);
+        num_deleted_attempts++;
+
+        // Skip every other block.
+        if (it != ids.end()) {
+          it++;
+        }
       }
+      ignore_result(deletion_transaction->CommitDeletedBlocks(&deleted));
     }
-    vector<BlockId> deleted;
-    ignore_result(deletion_transaction->CommitDeletedBlocks(&deleted));
     num_deleted += deleted.size();
     LOG(INFO) << Substitute("Successfully deleted $0 blocks on $1 attempts",
                             num_deleted, num_deleted_attempts);

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f3f9b2/src/kudu/fs/block_manager_metrics.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager_metrics.cc b/src/kudu/fs/block_manager_metrics.cc
index 26bb50e..a116e94 100644
--- a/src/kudu/fs/block_manager_metrics.cc
+++ b/src/kudu/fs/block_manager_metrics.cc
@@ -39,6 +39,16 @@ METRIC_DEFINE_counter(server, block_manager_total_readable_blocks,
                       kudu::MetricUnit::kBlocks,
                       "Number of data blocks opened for reading since service start");
 
+METRIC_DEFINE_counter(server, block_manager_total_blocks_created,
+                      "Data Blocks Created",
+                      kudu::MetricUnit::kBlocks,
+                      "Number of data blocks that were created since service start");
+
+METRIC_DEFINE_counter(server, block_manager_total_blocks_deleted,
+                      "Data Blocks Deleted",
+                      kudu::MetricUnit::kBlocks,
+                      "Number of data blocks that were deleted since service start");
+
 METRIC_DEFINE_counter(server, block_manager_total_bytes_written,
                       "Block Data Bytes Written",
                       kudu::MetricUnit::kBytes,
@@ -65,6 +75,8 @@ BlockManagerMetrics::BlockManagerMetrics(const scoped_refptr<MetricEntity>& enti
     GINIT(blocks_open_writing),
     MINIT(total_readable_blocks),
     MINIT(total_writable_blocks),
+    MINIT(total_blocks_created),
+    MINIT(total_blocks_deleted),
     MINIT(total_bytes_read),
     MINIT(total_bytes_written),
     MINIT(total_disk_sync) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f3f9b2/src/kudu/fs/block_manager_metrics.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager_metrics.h b/src/kudu/fs/block_manager_metrics.h
index 03d9872..0a8cc0c 100644
--- a/src/kudu/fs/block_manager_metrics.h
+++ b/src/kudu/fs/block_manager_metrics.h
@@ -34,6 +34,8 @@ struct BlockManagerMetrics {
 
   scoped_refptr<Counter> total_readable_blocks;
   scoped_refptr<Counter> total_writable_blocks;
+  scoped_refptr<Counter> total_blocks_created;
+  scoped_refptr<Counter> total_blocks_deleted;
   scoped_refptr<Counter> total_bytes_read;
   scoped_refptr<Counter> total_bytes_written;
   scoped_refptr<Counter> total_disk_sync;

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f3f9b2/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 788c191..7e0b4c3 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -410,6 +410,7 @@ Status FileWritableBlock::Close(SyncMode mode) {
   if (block_manager_->metrics_) {
     block_manager_->metrics_->blocks_open_writing->Decrement();
     block_manager_->metrics_->total_bytes_written->IncrementBy(BytesAppended());
+    block_manager_->metrics_->total_blocks_created->Increment();
   }
 
   // Either Close() or Sync() could have run into an error.
@@ -624,6 +625,9 @@ Status FileBlockDeletionTransaction::CommitDeletedBlocks(std::vector<BlockId>* d
       if (first_failure.ok()) first_failure = s;
     } else {
       deleted->emplace_back(block);
+      if (s.ok() && fbm_->metrics_) {
+        fbm_->metrics_->total_blocks_deleted->Increment();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f3f9b2/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 929cb02..44b89bb 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -85,9 +85,13 @@ DECLARE_string(env_inject_eio_globs);
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
 
+// Block manager metrics.
+METRIC_DECLARE_counter(block_manager_total_blocks_deleted);
+
 // Log block manager metrics.
 METRIC_DECLARE_gauge_uint64(log_block_manager_bytes_under_management);
 METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
+METRIC_DECLARE_counter(log_block_manager_holes_punched);
 METRIC_DECLARE_gauge_uint64(log_block_manager_containers);
 METRIC_DECLARE_gauge_uint64(log_block_manager_full_containers);
 
@@ -250,28 +254,43 @@ class LogBlockManagerTest : public KuduTest {
   }
 };
 
+static void CheckGaugeMetric(const scoped_refptr<MetricEntity>& entity,
+                             int expected_value, const MetricPrototype* prototype) {
+  AtomicGauge<uint64_t>* gauge = down_cast<AtomicGauge<uint64_t>*>(
+      entity->FindOrNull(*prototype).get());
+  DCHECK_NOTNULL(gauge);
+  ASSERT_EQ(expected_value, gauge->value());
+}
+
+static void CheckCounterMetric(const scoped_refptr<MetricEntity>& entity,
+                               int expected_value, const MetricPrototype* prototype) {
+  Counter* counter = down_cast<Counter*>(entity->FindOrNull(*prototype).get());
+  DCHECK_NOTNULL(counter);
+  ASSERT_EQ(expected_value, counter->value());
+}
+
 static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
-                            int bytes_under_management, int blocks_under_management,
-                            int containers, int full_containers) {
-  ASSERT_EQ(bytes_under_management, down_cast<AtomicGauge<uint64_t>*>(
-                entity->FindOrNull(METRIC_log_block_manager_bytes_under_management)
-                .get())->value());
-  ASSERT_EQ(blocks_under_management, down_cast<AtomicGauge<uint64_t>*>(
-                entity->FindOrNull(METRIC_log_block_manager_blocks_under_management)
-                .get())->value());
-  ASSERT_EQ(containers, down_cast<AtomicGauge<uint64_t>*>(
-                entity->FindOrNull(METRIC_log_block_manager_containers)
-                .get())->value());
-  ASSERT_EQ(full_containers, down_cast<AtomicGauge<uint64_t>*>(
-                entity->FindOrNull(METRIC_log_block_manager_full_containers)
-                .get())->value());
+                            const vector<std::pair<int, const MetricPrototype*>> gauge_values,
+                            const vector<std::pair<int, const MetricPrototype*>> counter_values) {
+  for (const auto& gauge_value : gauge_values) {
+    CheckGaugeMetric(entity, gauge_value.first, gauge_value.second);
+  }
+  for (const auto& counter_value: counter_values) {
+    CheckCounterMetric(entity, counter_value.first, counter_value.second);
+  }
 }
 
 TEST_F(LogBlockManagerTest, MetricsTest) {
   MetricRegistry registry;
   scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
   ASSERT_OK(ReopenBlockManager(entity));
-  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 0, 0));
+  NO_FATALS(CheckLogMetrics(entity,
+      { {0, &METRIC_log_block_manager_bytes_under_management},
+        {0, &METRIC_log_block_manager_blocks_under_management},
+        {0, &METRIC_log_block_manager_containers},
+        {0, &METRIC_log_block_manager_full_containers} },
+      { {0, &METRIC_log_block_manager_holes_punched},
+        {0, &METRIC_block_manager_total_blocks_deleted} }));
 
   // Lower the max container size so that we can more easily test full
   // container metrics.
@@ -280,11 +299,23 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
   // One block --> one container.
   unique_ptr<WritableBlock> writer;
   ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
-  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 1, 0));
+  NO_FATALS(CheckLogMetrics(entity,
+      { {0, &METRIC_log_block_manager_bytes_under_management},
+        {0, &METRIC_log_block_manager_blocks_under_management},
+        {1, &METRIC_log_block_manager_containers},
+        {0, &METRIC_log_block_manager_full_containers} },
+      { {0, &METRIC_log_block_manager_holes_punched},
+        {0, &METRIC_block_manager_total_blocks_deleted} }));
 
   // And when the block is closed, it becomes "under management".
   ASSERT_OK(writer->Close());
-  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 1, 1, 0));
+  NO_FATALS(CheckLogMetrics(entity,
+      { {0, &METRIC_log_block_manager_bytes_under_management},
+        {1, &METRIC_log_block_manager_blocks_under_management},
+        {1, &METRIC_log_block_manager_containers},
+        {0, &METRIC_log_block_manager_full_containers} },
+      { {0, &METRIC_log_block_manager_holes_punched},
+        {0, &METRIC_block_manager_total_blocks_deleted} }));
 
   // Create 10 blocks concurrently. We reuse the existing container and
   // create 9 new ones. All of them get filled.
@@ -307,10 +338,22 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
       transaction->AddCreatedBlock(std::move(b));
     }
     // Metrics for full containers are updated after Finalize().
-    ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 1, 10, 10));
+    NO_FATALS(CheckLogMetrics(entity,
+        { {0, &METRIC_log_block_manager_bytes_under_management},
+          {1, &METRIC_log_block_manager_blocks_under_management},
+          {10, &METRIC_log_block_manager_containers},
+          {10, &METRIC_log_block_manager_full_containers} },
+        { {0, &METRIC_log_block_manager_holes_punched},
+          {0, &METRIC_block_manager_total_blocks_deleted} }));
 
     ASSERT_OK(transaction->CommitCreatedBlocks());
-    ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 10 * 1024, 11, 10, 10));
+    NO_FATALS(CheckLogMetrics(entity,
+        { {10 * 1024, &METRIC_log_block_manager_bytes_under_management},
+          {11, &METRIC_log_block_manager_blocks_under_management},
+          {10, &METRIC_log_block_manager_containers},
+          {10, &METRIC_log_block_manager_full_containers} },
+        { {0, &METRIC_log_block_manager_holes_punched},
+          {0, &METRIC_block_manager_total_blocks_deleted} }));
   }
 
   // Reopen the block manager and test the metrics. They're all based on
@@ -318,15 +361,85 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
   MetricRegistry new_registry;
   scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_server.Instantiate(&new_registry, "test");
   ASSERT_OK(ReopenBlockManager(new_entity));
-  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 10 * 1024, 11, 10, 10));
+  NO_FATALS(CheckLogMetrics(new_entity,
+      { {10 * 1024, &METRIC_log_block_manager_bytes_under_management},
+        {11, &METRIC_log_block_manager_blocks_under_management},
+        {10, &METRIC_log_block_manager_containers},
+        {10, &METRIC_log_block_manager_full_containers} },
+      { {0, &METRIC_log_block_manager_holes_punched},
+        {0, &METRIC_block_manager_total_blocks_deleted} }));
 
   // Delete a block. Its contents should no longer be under management.
-  shared_ptr<BlockDeletionTransaction> deletion_transaction =
-      bm_->NewDeletionTransaction();
-  deletion_transaction->AddDeletedBlock(saved_id);
-  vector<BlockId> deleted;
-  ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
-  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 9 * 1024, 10, 10, 10));
+  {
+    shared_ptr<BlockDeletionTransaction> deletion_transaction =
+        bm_->NewDeletionTransaction();
+    deletion_transaction->AddDeletedBlock(saved_id);
+    vector<BlockId> deleted;
+    ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
+    NO_FATALS(CheckLogMetrics(new_entity,
+        { {9 * 1024, &METRIC_log_block_manager_bytes_under_management},
+          {10, &METRIC_log_block_manager_blocks_under_management},
+          {10, &METRIC_log_block_manager_containers},
+          {10, &METRIC_log_block_manager_full_containers} },
+        { {0, &METRIC_log_block_manager_holes_punched},
+          {1, &METRIC_block_manager_total_blocks_deleted} }));
+  }
+  // Wait for the actual hole punching to take place.
+  for (const auto& data_dir : dd_manager_->data_dirs()) {
+    data_dir->WaitOnClosures();
+  }
+  NO_FATALS(CheckLogMetrics(new_entity,
+      { {9 * 1024, &METRIC_log_block_manager_bytes_under_management},
+        {10, &METRIC_log_block_manager_blocks_under_management},
+        {10, &METRIC_log_block_manager_containers},
+        {10, &METRIC_log_block_manager_full_containers} },
+      { {1, &METRIC_log_block_manager_holes_punched},
+        {1, &METRIC_block_manager_total_blocks_deleted} }));
+
+  // Set the max container size to default so that we can create a bunch of blocks
+  // in the same container. Delete those created blocks afterwards to verify only
+  // one hole punch operation is executed since the blocks are contiguous.
+  FLAGS_log_container_max_size = 10LU * 1024 * 1024 * 1024;
+  {
+    vector<BlockId> blocks;
+    unique_ptr<BlockCreationTransaction> transaction = bm_->NewCreationTransaction();
+    for (int i = 0; i < 10; i++) {
+      unique_ptr<WritableBlock> b;
+      ASSERT_OK(bm_->CreateBlock(test_block_opts_, &b));
+      blocks.emplace_back(b->id());
+      b->Append("test data");
+      ASSERT_OK(b->Finalize());
+      transaction->AddCreatedBlock(std::move(b));
+    }
+    ASSERT_OK(transaction->CommitCreatedBlocks());
+
+    shared_ptr<BlockDeletionTransaction> deletion_transaction =
+        bm_->NewDeletionTransaction();
+    for (const auto& block : blocks) {
+      deletion_transaction->AddDeletedBlock(block);
+    }
+    vector<BlockId> deleted;
+    ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
+    ASSERT_EQ(blocks.size(), deleted.size());
+    NO_FATALS(CheckLogMetrics(new_entity,
+        { {9 * 1024, &METRIC_log_block_manager_bytes_under_management},
+          {10, &METRIC_log_block_manager_blocks_under_management},
+          {11, &METRIC_log_block_manager_containers},
+          {10, &METRIC_log_block_manager_full_containers} },
+        { {1, &METRIC_log_block_manager_holes_punched},
+          {11, &METRIC_block_manager_total_blocks_deleted} }));
+  }
+  // Wait for the actual hole punching to take place.
+  for (const auto& data_dir : dd_manager_->data_dirs()) {
+    data_dir->WaitOnClosures();
+  }
+  NO_FATALS(CheckLogMetrics(new_entity,
+      { {9 * 1024, &METRIC_log_block_manager_bytes_under_management},
+        {10, &METRIC_log_block_manager_blocks_under_management},
+        {11, &METRIC_log_block_manager_containers},
+        {10, &METRIC_log_block_manager_full_containers} },
+      { {2, &METRIC_log_block_manager_holes_punched},
+        {11, &METRIC_block_manager_total_blocks_deleted} }));
 }
 
 TEST_F(LogBlockManagerTest, ContainerPreallocationTest) {
@@ -402,8 +515,14 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
   ASSERT_EQ(4, bm_->all_containers_by_name_.size());
 
   // Delete the original blocks.
-  for (const BlockId& b : block_ids) {
-    ASSERT_OK(bm_->DeleteBlock(b));
+  {
+    shared_ptr<BlockDeletionTransaction> deletion_transaction =
+        bm_->NewDeletionTransaction();
+    for (const BlockId& b : block_ids) {
+      deletion_transaction->AddDeletedBlock(b);
+    }
+    vector<BlockId> deleted;
+    ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
   }
 
   // Reset the block ID sequence and re-create new blocks which should reuse the same
@@ -500,7 +619,13 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   // Delete the first block we created. This necessitates writing to the
   // metadata file of the originally-written container, since we append a
   // delete record to the metadata.
-  ASSERT_OK(bm_->DeleteBlock(created_blocks[0]));
+  {
+    shared_ptr<BlockDeletionTransaction> deletion_transaction =
+        bm_->NewDeletionTransaction();
+    deletion_transaction->AddDeletedBlock(created_blocks[0]);
+    vector<BlockId> deleted;
+    ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
+  }
   ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
   ASSERT_EQ(3, block_ids.size());
 
@@ -998,18 +1123,20 @@ TEST_F(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
     ASSERT_EQ(container_name, mb.container);
   }
 
-  shared_ptr<BlockDeletionTransaction> deletion_transaction =
-      this->bm_->NewDeletionTransaction();
   // Delete about half of them, chosen randomly.
   vector<BlockId> block_ids;
-  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
-  for (const auto& id : block_ids) {
-    if (rand() % 2) {
-      deletion_transaction->AddDeletedBlock(id);
+  {
+    shared_ptr<BlockDeletionTransaction> deletion_transaction =
+        this->bm_->NewDeletionTransaction();
+    ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
+    for (const auto& id : block_ids) {
+      if (rand() % 2) {
+        deletion_transaction->AddDeletedBlock(id);
+      }
     }
+    vector<BlockId> deleted;
+    ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
   }
-  vector<BlockId> deleted;
-  ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
 
   // Wait for the block manager to punch out all of the holes. It's easiest to
   // do this by reopening it; shutdown will wait for outstanding hole punches.
@@ -1305,11 +1432,13 @@ TEST_F(LogBlockManagerTest, TestDeleteDeadContainersAtStartup) {
 
   // Delete the one block and reopen it again. The container files should have
   // been deleted.
-  shared_ptr<BlockDeletionTransaction> deletion_transaction =
-      this->bm_->NewDeletionTransaction();
-  deletion_transaction->AddDeletedBlock(block->id());
-  vector<BlockId> deleted;
-  ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
+  {
+    shared_ptr<BlockDeletionTransaction> deletion_transaction =
+        this->bm_->NewDeletionTransaction();
+    deletion_transaction->AddDeletedBlock(block->id());
+    vector<BlockId> deleted;
+    ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
+  }
   ASSERT_OK(ReopenBlockManager());
   ASSERT_FALSE(env_->FileExists(data_file_name));
   ASSERT_FALSE(env_->FileExists(metadata_file_name));
@@ -1344,7 +1473,13 @@ TEST_F(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
   int64_t last_live_aligned_bytes;
   int num_blocks_deleted = 0;
   for (const auto& id : block_ids) {
-    ASSERT_OK(bm_->DeleteBlock(id));
+    {
+      shared_ptr<BlockDeletionTransaction> deletion_transaction =
+          bm_->NewDeletionTransaction();
+      deletion_transaction->AddDeletedBlock(id);
+      vector<BlockId> deleted;
+      ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
+    }
     num_blocks_deleted++;
     FsReport report;
     ASSERT_OK(ReopenBlockManager(nullptr, &report));

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f3f9b2/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 9bfe88d..2dc1a1a 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -71,6 +71,7 @@
 #include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
+#include "kudu/util/sorted_disjoint_interval_list.h"
 #include "kudu/util/test_util_prod.h"
 #include "kudu/util/trace.h"
 
@@ -132,12 +133,18 @@ METRIC_DEFINE_gauge_uint64(server, log_block_manager_full_containers,
                            kudu::MetricUnit::kLogBlockContainers,
                            "Number of full log block containers");
 
+METRIC_DEFINE_counter(server, log_block_manager_holes_punched,
+                      "Number of Holes Punched",
+                      kudu::MetricUnit::kHoles,
+                      "Number of holes punched since service start");
+
 namespace kudu {
 
 namespace fs {
 
 using internal::LogBlock;
 using internal::LogBlockContainer;
+using internal::LogBlockDeletionTransaction;
 using internal::LogWritableBlock;
 using pb_util::ReadablePBContainerFile;
 using pb_util::WritablePBContainerFile;
@@ -173,15 +180,19 @@ struct LogBlockManagerMetrics {
 
   scoped_refptr<AtomicGauge<uint64_t>> containers;
   scoped_refptr<AtomicGauge<uint64_t>> full_containers;
+
+  scoped_refptr<Counter> holes_punched;
 };
 
+#define MINIT(x) x(METRIC_log_block_manager_##x.Instantiate(metric_entity))
 #define GINIT(x) x(METRIC_log_block_manager_##x.Instantiate(metric_entity, 0))
 LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>& metric_entity)
   : generic_metrics(metric_entity),
     GINIT(bytes_under_management),
     GINIT(blocks_under_management),
     GINIT(containers),
-    GINIT(full_containers) {
+    GINIT(full_containers),
+    MINIT(holes_punched) {
 }
 #undef GINIT
 
@@ -205,7 +216,7 @@ class LogBlock : public RefCountedThreadSafe<LogBlock> {
  public:
   LogBlock(LogBlockContainer* container, BlockId block_id, int64_t offset,
            int64_t length);
-  ~LogBlock();
+  ~LogBlock() = default;
 
   const BlockId& block_id() const { return block_id_; }
   LogBlockContainer* container() const { return container_; }
@@ -215,9 +226,9 @@ class LogBlock : public RefCountedThreadSafe<LogBlock> {
   // Returns a block's length aligned to the nearest filesystem block size.
   int64_t fs_aligned_length() const;
 
-  // Delete the block. Actual deletion takes place when the
-  // block is destructed.
-  void Delete();
+  // Registers the deletion of the block with a deletion transaction. Actual
+  // deletion will take place when the deletion transaction is destructed.
+  void RegisterDeletion(const shared_ptr<LogBlockDeletionTransaction>& transaction);
 
  private:
   // The owning container. Must outlive the LogBlock.
@@ -232,8 +243,8 @@ class LogBlock : public RefCountedThreadSafe<LogBlock> {
   // The block's length.
   const int64_t length_;
 
-  // Whether the block has been marked for deletion.
-  bool deleted_;
+  // The block deletion transaction with which this block has been registered.
+  shared_ptr<LogBlockDeletionTransaction> transaction_;
 
   DISALLOW_COPY_AND_ASSIGN(LogBlock);
 };
@@ -356,13 +367,16 @@ class LogBlockContainer {
   // If successful, adds all blocks to the block manager's in-memory maps.
   Status DoCloseBlocks(const vector<LogWritableBlock*>& blocks, SyncMode mode);
 
-  // Frees the space associated with a block at 'offset' and 'length'. This
-  // is a physical operation, not a logical one; a separate AppendMetadata()
-  // is required to record the deletion in container metadata.
+  // Frees the space associated with a block or a group of blocks at 'offset'
+  // and 'length'. This is a physical operation, not a logical one; a separate
+  // AppendMetadata() is required to record the deletion in container metadata.
   //
   // The on-disk effects of this call are made durable only after SyncData().
   Status PunchHole(int64_t offset, int64_t length);
 
+  // Executes a hole punching operation at 'offset' with the given 'length'.
+  void ContainerDeletionAsync(int64_t offset, int64_t length);
+
   // Preallocate enough space to ensure that an append of 'next_append_length'
   // can be satisfied by this container. The offset of the beginning of this
   // block must be provided in 'block_start_offset' (since container
@@ -1140,6 +1154,14 @@ void LogBlockContainer::SetReadOnly(const Status& error) {
   read_only_status_ = error;
 }
 
+void LogBlockContainer::ContainerDeletionAsync(int64_t offset, int64_t length) {
+  VLOG(3) << "Freeing space belonging to container " << ToString();
+  Status s = PunchHole(offset, length);
+  if (s.ok() && metrics_) metrics_->holes_punched->Increment();
+  WARN_NOT_OK(s, Substitute("could not delete blocks in container $0",
+                            data_dir()->dir()));
+}
+
 ///////////////////////////////////////////////////////////
 // LogBlockCreationTransaction
 ////////////////////////////////////////////////////////////
@@ -1196,19 +1218,34 @@ Status LogBlockCreationTransaction::CommitCreatedBlocks() {
 // LogBlockDeletionTransaction
 ////////////////////////////////////////////////////////////
 
-class LogBlockDeletionTransaction : public BlockDeletionTransaction {
+class LogBlockDeletionTransaction : public BlockDeletionTransaction,
+    public std::enable_shared_from_this<LogBlockDeletionTransaction> {
  public:
   explicit LogBlockDeletionTransaction(LogBlockManager* lbm)
       : lbm_(lbm) {
   }
 
-  virtual ~LogBlockDeletionTransaction() = default;
+  // Given the shared ownership of LogBlockDeletionTransaction, at this point
+  // all registered blocks should be destructed. Thus, coalescing deletions
+  // for blocks that are contiguous in a container and schedules hole punching.
+  virtual ~LogBlockDeletionTransaction();
 
   virtual void AddDeletedBlock(BlockId block) override;
 
   virtual Status CommitDeletedBlocks(std::vector<BlockId>* deleted) override;
 
+  // Add the given block that needs to be deleted to 'deleted_interval_map_',
+  // which keeps track of container and the range to be hole punched.
+  void AddBlock(const scoped_refptr<internal::LogBlock>& lb);
+
  private:
+  // Block <offset, offset + length> pair.
+  typedef std::pair<int64_t, int64_t> BlockInterval;
+
+  // Map used to aggregate BlockInterval instances across containers.
+  std::unordered_map<internal::LogBlockContainer*,
+      std::vector<BlockInterval>> deleted_interval_map_;
+
   // The owning LogBlockManager. Must outlive the LogBlockDeletionTransaction.
   LogBlockManager* lbm_;
   std::vector<BlockId> deleted_blocks_;
@@ -1219,16 +1256,36 @@ void LogBlockDeletionTransaction::AddDeletedBlock(BlockId block) {
   deleted_blocks_.emplace_back(block);
 }
 
+LogBlockDeletionTransaction::~LogBlockDeletionTransaction() {
+  for (auto& entry : deleted_interval_map_) {
+    LogBlockContainer* container = entry.first;
+    CHECK_OK_PREPEND(CoalesceIntervals<int64_t>(&entry.second),
+                     Substitute("could not coalesce hole punching for container: $0",
+                                container->ToString()));
+
+    for (const auto& interval : entry.second) {
+      container->ExecClosure(Bind(&LogBlockContainer::ContainerDeletionAsync,
+                                  Unretained(container),
+                                  interval.first,
+                                  interval.second - interval.first));
+    }
+  }
+}
+
 Status LogBlockDeletionTransaction::CommitDeletedBlocks(std::vector<BlockId>* deleted) {
   deleted->clear();
-  Status first_failure;
-  for (BlockId block : deleted_blocks_) {
-    Status s = lbm_->DeleteBlock(block);
-    // If we get NotFound, then the block was already deleted.
-    if (!s.ok() && !s.IsNotFound()) {
-      if (first_failure.ok()) first_failure = s;
-    } else {
-      deleted->emplace_back(block);
+  shared_ptr<LogBlockDeletionTransaction> transaction = shared_from_this();
+
+  vector<scoped_refptr<LogBlock>> log_blocks;
+  Status first_failure = lbm_->RemoveLogBlocks(deleted_blocks_, &log_blocks, deleted);
+  for (const auto& lb : log_blocks) {
+    // Register the block to be hole punched if metadata recording
+    // is successful.
+    lb->RegisterDeletion(transaction);
+    AddBlock(lb);
+
+    if (lbm_->metrics_) {
+      lbm_->metrics_->generic_metrics.total_blocks_deleted->Increment();
     }
   }
 
@@ -1240,6 +1297,14 @@ Status LogBlockDeletionTransaction::CommitDeletedBlocks(std::vector<BlockId>* de
   return first_failure;
 }
 
+void LogBlockDeletionTransaction::AddBlock(const scoped_refptr<internal::LogBlock>& lb) {
+  DCHECK_GE(lb->fs_aligned_length(), 0);
+
+  BlockInterval block_interval(lb->offset(),
+                               lb->offset() + lb->fs_aligned_length());
+  deleted_interval_map_[lb->container()].emplace_back(block_interval);
+}
+
 ////////////////////////////////////////////////////////////
 // LogBlock (definition)
 ////////////////////////////////////////////////////////////
@@ -1249,33 +1314,11 @@ LogBlock::LogBlock(LogBlockContainer* container, BlockId block_id,
     : container_(container),
       block_id_(block_id),
       offset_(offset),
-      length_(length),
-      deleted_(false) {
+      length_(length) {
   DCHECK_GE(offset, 0);
   DCHECK_GE(length, 0);
 }
 
-static void DeleteBlockAsync(LogBlockContainer* container,
-                             BlockId block_id,
-                             int64_t offset,
-                             int64_t fs_aligned_length) {
-  // We don't call SyncData() to synchronize the deletion because it's
-  // expensive, and in the worst case, we'll just leave orphaned data
-  // behind to be cleaned up in the next GC.
-  VLOG(3) << "Freeing space belonging to block " << block_id;
-  WARN_NOT_OK(container->PunchHole(offset, fs_aligned_length),
-              Substitute("Could not delete block $0", block_id.ToString()));
-}
-
-LogBlock::~LogBlock() {
-  if (deleted_) {
-    // Use the block's aligned length so that the filesystem can reclaim
-    // maximal disk space.
-    container_->ExecClosure(Bind(&DeleteBlockAsync, container_, block_id_,
-                                 offset_, fs_aligned_length()));
-  }
-}
-
 int64_t LogBlock::fs_aligned_length() const {
   uint64_t fs_block_size = container_->instance()->filesystem_block_size_bytes();
 
@@ -1294,9 +1337,12 @@ int64_t LogBlock::fs_aligned_length() const {
   return length_;
 }
 
-void LogBlock::Delete() {
-  DCHECK(!deleted_);
-  deleted_ = true;
+void LogBlock::RegisterDeletion(
+    const shared_ptr<LogBlockDeletionTransaction>& transaction) {
+  DCHECK(!transaction_);
+  DCHECK(transaction);
+
+  transaction_ = transaction;
 }
 
 ////////////////////////////////////////////////////////////
@@ -1374,7 +1420,11 @@ Status LogWritableBlock::Abort() {
 
   // DoCloseBlocks() has unlocked the container; it may be locked by someone else.
   // But block_manager_ is immutable, so this is safe.
-  return container_->block_manager()->DeleteBlock(id());
+  shared_ptr<BlockDeletionTransaction> deletion_transaction =
+      container_->block_manager()->NewDeletionTransaction();
+  deletion_transaction->AddDeletedBlock(id());
+  vector<BlockId> deleted;
+  return deletion_transaction->CommitDeletedBlocks(&deleted);
 }
 
 const BlockId& LogWritableBlock::id() const {
@@ -1463,6 +1513,7 @@ void LogWritableBlock::DoClose() {
     container_->metrics()->generic_metrics.blocks_open_writing->Decrement();
     container_->metrics()->generic_metrics.total_bytes_written->IncrementBy(
         block_length_);
+    container_->metrics()->generic_metrics.total_blocks_created->Increment();
   }
 
   // Finalize() was not called; this indicates we should
@@ -1826,60 +1877,6 @@ Status LogBlockManager::OpenBlock(const BlockId& block_id,
   return Status::OK();
 }
 
-Status LogBlockManager::DeleteBlock(const BlockId& block_id) {
-  CHECK(!read_only_);
-
-  // Deletion is forbidden for read-only container.
-  scoped_refptr<LogBlock> lb;
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    auto it = blocks_by_block_id_.find(block_id);
-    if (it == blocks_by_block_id_.end()) {
-      return Status::NotFound("Can't find block", block_id.ToString());
-    }
-
-    lb = it->second;
-    HANDLE_DISK_FAILURE(lb->container()->read_only_status(),
-        error_manager_->RunErrorNotificationCb(lb->container()->data_dir()));
-
-    // Return early if deleting a block in a failed directory.
-    set<int> failed_dirs = dd_manager_->GetFailedDataDirs();
-    if (PREDICT_FALSE(!failed_dirs.empty())) {
-      int uuid_idx;
-      CHECK(dd_manager_->FindUuidIndexByDataDir(lb->container()->data_dir(), &uuid_idx));
-      if (ContainsKey(failed_dirs, uuid_idx)) {
-        LOG_EVERY_N(INFO, 10) << Substitute("Block $0 is in a failed directory; not deleting",
-                                            block_id.ToString());
-        return Status::IOError("Block is in a failed directory");
-      }
-    }
-    RemoveLogBlockUnlocked(it);
-  }
-
-  VLOG(3) << "Deleting block " << block_id;
-  lb->Delete();
-  lb->container()->BlockDeleted(lb);
-
-  // Record the on-disk deletion.
-  //
-  // TODO(unknown): what if this fails? Should we restore the in-memory block?
-  BlockRecordPB record;
-  block_id.CopyToPB(record.mutable_block_id());
-  record.set_op_type(DELETE);
-  record.set_timestamp_us(GetCurrentTimeMicros());
-  RETURN_NOT_OK_PREPEND(lb->container()->AppendMetadata(record),
-                        "Unable to append deletion record to block metadata");
-
-  // We don't bother fsyncing the metadata append for deletes in order to avoid
-  // the disk overhead. Even if we did fsync it, we'd still need to account for
-  // garbage at startup time (in the event that we crashed just before the
-  // fsync).
-  //
-  // TODO(KUDU-829): Implement GC of orphaned blocks.
-
-  return Status::OK();
-}
-
 unique_ptr<BlockCreationTransaction> LogBlockManager::NewCreationTransaction() {
   CHECK(!read_only_);
   return unique_ptr<internal::LogBlockCreationTransaction>(
@@ -2051,19 +2048,99 @@ bool LogBlockManager::AddLogBlockUnlocked(scoped_refptr<LogBlock> lb) {
   return true;
 }
 
-void LogBlockManager::RemoveLogBlockUnlocked(const BlockMap::iterator& it) {
-  scoped_refptr<LogBlock> lb = std::move(it->second);
-  blocks_by_block_id_.erase(it);
+Status LogBlockManager::RemoveLogBlocks(vector<BlockId> block_ids,
+                                        vector<scoped_refptr<LogBlock>>* log_blocks,
+                                        vector<BlockId>* deleted) {
+  Status first_failure;
+  vector<scoped_refptr<LogBlock>> lbs;
+  int64_t malloc_space = 0, blocks_length = 0;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    for (const auto& block_id : block_ids) {
+      scoped_refptr<LogBlock> lb;
+      Status s = RemoveLogBlockUnlocked(block_id, &lb);
+      // If we get NotFound, then the block was already deleted.
+      if (!s.ok() && !s.IsNotFound()) {
+        if (first_failure.ok()) first_failure = s;
+      } else if (s.ok()) {
+        malloc_space += kudu_malloc_usable_size(lb.get());
+        blocks_length += lb->length();
+        lbs.emplace_back(std::move(lb));
+      } else {
+        deleted->emplace_back(block_id);
+      }
+    }
+  }
 
-  VLOG(2) << Substitute("Removed block: offset $0, length $1",
-                        lb->offset(), lb->length());
+  // Update various metrics.
+  mem_tracker_->Release(malloc_space);
+  if (metrics()) {
+    metrics()->blocks_under_management->DecrementBy(lbs.size());
+    metrics()->bytes_under_management->DecrementBy(blocks_length);
+  }
 
-  mem_tracker_->Release(kudu_malloc_usable_size(lb.get()));
+  for (auto& lb : lbs) {
+    VLOG(3) << "Deleting block " << lb->block_id();
+    lb->container()->BlockDeleted(lb);
 
-  if (metrics()) {
-    metrics()->blocks_under_management->Decrement();
-    metrics()->bytes_under_management->DecrementBy(lb->length());
+    // Record the on-disk deletion.
+    //
+    // TODO(unknown): what if this fails? Should we restore the in-memory block?
+    BlockRecordPB record;
+    lb->block_id().CopyToPB(record.mutable_block_id());
+    record.set_op_type(DELETE);
+    record.set_timestamp_us(GetCurrentTimeMicros());
+    Status s = lb->container()->AppendMetadata(record);
+
+    // We don't bother fsyncing the metadata append for deletes in order to avoid
+    // the disk overhead. Even if we did fsync it, we'd still need to account for
+    // garbage at startup time (in the event that we crashed just before the
+    // fsync).
+    //
+    // TODO(KUDU-829): Implement GC of orphaned blocks.
+
+    if (!s.ok()) {
+      if (first_failure.ok()) {
+        first_failure = s.CloneAndPrepend(
+            "Unable to append deletion record to block metadata");
+      }
+    } else {
+      deleted->emplace_back(lb->block_id());
+      log_blocks->emplace_back(std::move(lb));
+    }
+  }
+
+  return first_failure;
+}
+
+Status LogBlockManager::RemoveLogBlockUnlocked(const BlockId& block_id,
+                                               scoped_refptr<internal::LogBlock>* lb) {
+  auto it = blocks_by_block_id_.find(block_id);
+  if (it == blocks_by_block_id_.end()) {
+    return Status::NotFound("Can't find block", block_id.ToString());
+  }
+
+  LogBlockContainer* container = it->second->container();
+  HANDLE_DISK_FAILURE(container->read_only_status(),
+                      error_manager_->RunErrorNotificationCb(container->data_dir()));
+
+  // Return early if deleting a block in a failed directory.
+  set<int> failed_dirs = dd_manager_->GetFailedDataDirs();
+  if (PREDICT_FALSE(!failed_dirs.empty())) {
+    int uuid_idx;
+    CHECK(dd_manager_->FindUuidIndexByDataDir(container->data_dir(), &uuid_idx));
+    if (ContainsKey(failed_dirs, uuid_idx)) {
+      LOG_EVERY_N(INFO, 10) << Substitute("Block $0 is in a failed directory; not deleting",
+                                          block_id.ToString());
+      return Status::IOError("Block is in a failed directory");
+    }
   }
+  *lb = std::move(it->second);
+  blocks_by_block_id_.erase(it);
+
+  VLOG(2) << Substitute("Removed block: offset $0, length $1",
+                        (*lb)->offset(), (*lb)->length());
+  return Status::OK();
 }
 
 void LogBlockManager::OpenDataDir(DataDir* dir,
@@ -2519,10 +2596,13 @@ Status LogBlockManager::Repair(
   // Repunch all requested holes. Any excess space reclaimed was already
   // tracked by LBMFullContainerSpaceCheck.
   //
-  // TODO(adar): can be more efficient (less fs work and more space reclamation
-  // in case of misaligned blocks) via hole coalescing first, but this is easy.
+  // Register deletions to a single BlockDeletionTransaction. So, the repunched
+  // holes belonging to the same container can be coalesced.
+  shared_ptr<LogBlockDeletionTransaction> transaction =
+      std::make_shared<LogBlockDeletionTransaction>(this);
   for (const auto& b : need_repunching) {
-    b->Delete();
+    b->RegisterDeletion(transaction);
+    transaction->AddBlock(b);
   }
 
   // Clearing this vector drops the last references to the LogBlocks within,

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f3f9b2/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index b37a62d..7118830 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -258,14 +258,6 @@ class LogBlockManager : public BlockManager {
       std::string,
       std::vector<BlockRecordPB>> BlockRecordsByContainerMap;
 
-  // Deletes an existing block, allowing its space to be reclaimed by the
-  // filesystem. The change is immediately made durable.
-  //
-  // Blocks may be deleted while they are open for reading or writing;
-  // the actual deletion will take place after the last open reader or
-  // writer is closed.
-  Status DeleteBlock(const BlockId& block_id);
-
   // Adds an as of yet unseen container to this block manager.
   //
   // Must be called with 'lock_' held.
@@ -317,8 +309,23 @@ class LogBlockManager : public BlockManager {
   // Returns true if the LogBlock was successfully added, false if it was already present.
   bool AddLogBlockUnlocked(scoped_refptr<internal::LogBlock> lb);
 
-  // Removes the given LogBlock from in-memory data structures. Must hold 'lock_'.
-  void RemoveLogBlockUnlocked(const BlockMap::iterator& it);
+  // Removes the given set of LogBlocks from in-memory data structures, and
+  // appends the block deletion metadata to record the on-disk deletion.
+  // The 'log_blocks' out parameter will be set with the LogBlocks that were
+  // successfully removed. The 'deleted' out parameter will be set with the
+  // blocks were already deleted, e.g encountered 'NotFound' error during removal.
+  //
+  // Returns the first deletion failure that was seen, if any.
+  Status RemoveLogBlocks(std::vector<BlockId> block_ids,
+                         std::vector<scoped_refptr<internal::LogBlock>>* log_blocks,
+                         std::vector<BlockId>* deleted);
+
+  // Removes a LogBlock from in-memory data structures. Must hold 'lock_'.
+  // The 'lb' out parameter will be set with the successfully deleted LogBlock.
+  //
+  // Returns an error of LogBlock cannot be successfully removed.
+  Status RemoveLogBlockUnlocked(const BlockId& block_id,
+                                scoped_refptr<internal::LogBlock>* lb);
 
   // Repairs any inconsistencies for 'dir' described in 'report'.
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f3f9b2/src/kudu/util/metrics.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/metrics.cc b/src/kudu/util/metrics.cc
index 323ea80..4954bac 100644
--- a/src/kudu/util/metrics.cc
+++ b/src/kudu/util/metrics.cc
@@ -94,6 +94,8 @@ const char* MetricUnit::Name(Type unit) {
       return "operations";
     case kBlocks:
       return "blocks";
+    case kHoles:
+      return "holes";
     case kLogBlockContainers:
       return "log block containers";
     case kTasks:

http://git-wip-us.apache.org/repos/asf/kudu/blob/15f3f9b2/src/kudu/util/metrics.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/metrics.h b/src/kudu/util/metrics.h
index fe0c635..60412c2 100644
--- a/src/kudu/util/metrics.h
+++ b/src/kudu/util/metrics.h
@@ -381,6 +381,7 @@ struct MetricUnit {
     kScanners,
     kMaintenanceOperations,
     kBlocks,
+    kHoles,
     kLogBlockContainers,
     kTasks,
     kMessages,