You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/06/06 02:14:15 UTC

[kudu] branch master updated: [tablet] Support accurate count of rows

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new c1c8e25  [tablet] Support accurate count of rows
c1c8e25 is described below

commit c1c8e25bbc2e783446f1dd257cd99a16a79ccb16
Author: helifu <hz...@corp.netease.com>
AuthorDate: Wed Jun 5 19:58:31 2019 +0800

    [tablet] Support accurate count of rows
    
      A tablet is consisted of one MRS and a group of DRS. And the
      group of DRS makes up a RowSetTree. Thus, the number of live
      rows in a tablet comes from MRS and RowSetTree. At the same
      time, a new capability is added to the tablet superblock to
      ensure that only newly created tablets enable the capability.
    
    1. At the beginning, new rows will be written (insert/delete/
       reinsert) into MRS and counter1 holds the number of rows;
    2. Next, MRS is being flushed to DRS (flush1):
       When MRS is being flushed, it will be attached to the RowSetTree,
       so we can count rows there (still counter1);
       When MRS is flushed to DRS, new RowSetMetadatas will be created
       and every one has persistent counter2;
    3. Then, there will be updates to DRS and they will be accumulated
       in DMS. counter3 holds the number of rows;
    4. If DMS is flushed (flush2):
       When DMS is being flushed, it will be attached to the Redo list,
       and its counter3 will be swapped to DeltaTracker;
       When DMS is flushed to Redo, the value of counter3 will be added
       to the counter2 of RowSetMetadata;
    5. MinorCompact:
       No influence since any insertions or deletions come from memory;
    6. MajorCompact:
       No influence, just like step5;
    7. Compact:
       Just like step2.
    
                                __counter2(persistent)
                               /
                               |
                       __RowSetMetadata           __counter3
                      /                          /
           flush1     |                          |
    [MRS] -------> [ DRS ... ] -- Undo + Redo + DMS
      |                                   ^      |
      \__                                 |______|
         counter1                          flush2
    
    Change-Id: I2e6378e289bb85024c29e96c2b153fc417ed6412
    Reviewed-on: http://gerrit.cloudera.org:8080/13456
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/master/sys_catalog.cc              |  1 +
 src/kudu/tablet/compaction-test.cc          | 80 +++++++++++++++++++++++++++--
 src/kudu/tablet/compaction.cc               | 11 +++-
 src/kudu/tablet/delta_tracker.cc            | 14 ++++-
 src/kudu/tablet/delta_tracker.h             |  9 ++++
 src/kudu/tablet/deltamemstore-test.cc       | 22 ++++++++
 src/kudu/tablet/deltamemstore.cc            | 13 ++++-
 src/kudu/tablet/deltamemstore.h             |  6 +++
 src/kudu/tablet/diskrowset-test.cc          |  5 +-
 src/kudu/tablet/diskrowset.cc               | 17 ++++--
 src/kudu/tablet/diskrowset.h                |  9 +++-
 src/kudu/tablet/memrowset-test.cc           | 31 +++++++++++
 src/kudu/tablet/memrowset.cc                |  9 +++-
 src/kudu/tablet/memrowset.h                 | 10 ++++
 src/kudu/tablet/metadata.proto              |  8 +++
 src/kudu/tablet/mock-rowsets.h              |  4 ++
 src/kudu/tablet/rowset.cc                   |  9 ++++
 src/kudu/tablet/rowset.h                    |  5 ++
 src/kudu/tablet/rowset_metadata.cc          | 24 +++++++++
 src/kudu/tablet/rowset_metadata.h           | 18 ++++++-
 src/kudu/tablet/tablet-test-util.h          |  2 +-
 src/kudu/tablet/tablet-test.cc              | 28 ++++++++--
 src/kudu/tablet/tablet.cc                   | 19 +++++++
 src/kudu/tablet/tablet.h                    |  3 ++
 src/kudu/tablet/tablet_metadata.cc          | 22 ++++++--
 src/kudu/tablet/tablet_metadata.h           | 16 +++++-
 src/kudu/tools/kudu-tool-test.cc            |  2 +
 src/kudu/tserver/tablet_copy-test-base.h    |  2 +-
 src/kudu/tserver/tablet_copy_client-test.cc | 20 +++++++-
 src/kudu/tserver/tablet_copy_client.cc      |  1 +
 src/kudu/tserver/ts_tablet_manager.cc       |  1 +
 31 files changed, 388 insertions(+), 33 deletions(-)

diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index e90531f..a1410ae 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -252,6 +252,7 @@ Status SysCatalogTable::CreateNew(FsManager *fs_manager) {
                                                   partitions[0],
                                                   tablet::TABLET_DATA_READY,
                                                   /*tombstone_last_logged_opid=*/ boost::none,
+                                                  /*supports_live_row_count=*/ true,
                                                   &metadata));
 
   RaftConfigPB config;
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index e5bafea..fd0006d 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/tablet/compaction.h"
+
 #include <algorithm>
 #include <cstdint>
 #include <cstdio>
@@ -52,7 +54,6 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/compaction.h"
 #include "kudu/tablet/diskrowset.h"
 #include "kudu/tablet/local_tablet_writer.h"
 #include "kudu/tablet/memrowset.h"
@@ -232,10 +233,13 @@ class TestCompaction : public KuduRowSetTest {
   }
 
   void DeleteRows(RowSet* rowset, int n_rows) {
-    faststring update_buf;
+    DeleteRows(rowset, n_rows, 0);
+  }
+
+  void DeleteRows(RowSet* rowset, int n_rows, int delta) {
     for (uint32_t i = 0; i < n_rows; i++) {
       SCOPED_TRACE(i);
-      DeleteRow(rowset, i * 10);
+      DeleteRow(rowset, i * 10 + delta);
     }
   }
 
@@ -614,7 +618,7 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
   shared_ptr<DiskRowSet> rs3;
   {
     shared_ptr<MemRowSet> mrs;
-    ASSERT_OK(MemRowSet::Create(1, schema_, log_anchor_registry_.get(),
+    ASSERT_OK(MemRowSet::Create(2, schema_, log_anchor_registry_.get(),
                                 mem_trackers_.tablet_tracker, &mrs));
     InsertRows(mrs.get(), 10, 0);
     UpdateRows(mrs.get(), 10, 0, 2);
@@ -1183,5 +1187,73 @@ TEST_F(TestCompaction, TestEmptyFlushDoesntLeakBlocks) {
   ASSERT_EQ(after_block_ids, before_block_ids);
 }
 
+TEST_F(TestCompaction, TestCountLiveRowsOfMemRowSetFlush) {
+  shared_ptr<MemRowSet> mrs;
+  ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
+                              mem_trackers_.tablet_tracker, &mrs));
+  NO_FATALS(InsertRows(mrs.get(), 100, 0));
+  NO_FATALS(UpdateRows(mrs.get(), 80, 0, 1));
+  NO_FATALS(DeleteRows(mrs.get(), 50));
+  NO_FATALS(InsertRows(mrs.get(), 10, 0));
+  int64_t count = 0;
+  ASSERT_OK(mrs->CountLiveRows(&count));
+  ASSERT_EQ(100 - 50 + 10, count);
+
+  shared_ptr<DiskRowSet> rs;
+  NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs));
+  ASSERT_OK(rs->CountLiveRows(&count));
+  ASSERT_EQ(100 - 50 + 10, count);
+}
+
+TEST_F(TestCompaction, TestCountLiveRowsOfDiskRowSetsCompact) {
+  shared_ptr<DiskRowSet> rs1;
+  {
+    shared_ptr<MemRowSet> mrs;
+    ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
+                                mem_trackers_.tablet_tracker, &mrs));
+    NO_FATALS(InsertRows(mrs.get(), 100, 0));
+    NO_FATALS(UpdateRows(mrs.get(), 80, 0, 1));
+    NO_FATALS(DeleteRows(mrs.get(), 50, 0));
+    NO_FATALS(InsertRows(mrs.get(), 10, 0));
+    NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs1));
+  }
+  shared_ptr<DiskRowSet> rs2;
+  {
+    shared_ptr<MemRowSet> mrs;
+    ASSERT_OK(MemRowSet::Create(1, schema_, log_anchor_registry_.get(),
+                                mem_trackers_.tablet_tracker, &mrs));
+    NO_FATALS(InsertRows(mrs.get(), 100, 1));
+    NO_FATALS(UpdateRows(mrs.get(), 80, 1, 1));
+    NO_FATALS(DeleteRows(mrs.get(), 50, 1));
+    NO_FATALS(InsertRows(mrs.get(), 10, 1));
+    NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs2));
+  }
+  shared_ptr<DiskRowSet> rs3;
+  {
+    shared_ptr<MemRowSet> mrs;
+    ASSERT_OK(MemRowSet::Create(2, schema_, log_anchor_registry_.get(),
+                                mem_trackers_.tablet_tracker, &mrs));
+    NO_FATALS(InsertRows(mrs.get(), 100, 2));
+    NO_FATALS(UpdateRows(mrs.get(), 80, 2, 2));
+    NO_FATALS(DeleteRows(mrs.get(), 50, 2));
+    NO_FATALS(InsertRows(mrs.get(), 10, 2));
+    NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs3));
+  }
+
+  shared_ptr<DiskRowSet> result;
+  vector<shared_ptr<DiskRowSet>> all_rss;
+  all_rss.emplace_back(std::move(rs3));
+  all_rss.emplace_back(std::move(rs1));
+  all_rss.emplace_back(std::move(rs2));
+
+  SeedRandom();
+  std::random_shuffle(all_rss.begin(), all_rss.end());
+  NO_FATALS(CompactAndReopenNoRoll(all_rss, schema_, &result));
+
+  int64_t count = 0;
+  ASSERT_OK(result->CountLiveRows(&count));
+  ASSERT_EQ((100 - 50 + 10) * 3, count);
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index a076302..b760776 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -1107,6 +1107,7 @@ Status FlushCompactionInput(CompactionInput* input,
     RETURN_NOT_OK(input->PrepareBlock(&rows));
 
     int n = 0;
+    int live_row_count = 0;
     for (int i = 0; i < rows.size(); i++) {
       CompactionInputRow* input_row = &rows[i];
       RETURN_NOT_OK(out->RollIfNecessary());
@@ -1162,19 +1163,25 @@ Status FlushCompactionInput(CompactionInput* input,
         out->AppendRedoDeltas(dst_row.row_index(), new_redos_head, &index_in_current_drs);
       }
 
+      // If the REDO is empty, it should not be a DELETE.
+      if (new_redos_head == nullptr) {
+        live_row_count++;
+      }
+
       DVLOG(4) << "Output Row: " << dst_row.schema()->DebugRow(dst_row)
                << "; RowId: " << index_in_current_drs;
 
       n++;
       if (n == block.nrows()) {
-        RETURN_NOT_OK(out->AppendBlock(block));
+        RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
+        live_row_count = 0;
         n = 0;
       }
     }
 
     if (n > 0) {
       block.Resize(n);
-      RETURN_NOT_OK(out->AppendBlock(block));
+      RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
       block.Resize(block.row_capacity());
     }
 
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index dd3c788..9400e32 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -95,7 +95,8 @@ DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata,
       read_only_(false),
       log_anchor_registry_(log_anchor_registry),
       mem_trackers_(std::move(mem_trackers)),
-      dms_empty_(true) {}
+      dms_empty_(true),
+      deleted_row_count_(0) {}
 
 Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
                                       const IOContext* io_context,
@@ -717,6 +718,9 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
                                             dfr));
   VLOG_WITH_PREFIX(1) << "Opened new delta block " << block_id.ToString() << " for read";
 
+  // Merge the deleted row count of the old DMS to the RowSetMetadata if necessary.
+  rowset_metadata_->IncrementLiveRows(-deleted_row_count_);
+
   RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(), block_id));
   if (flush_type == FLUSH_METADATA) {
     RETURN_NOT_OK_PREPEND(rowset_metadata_->Flush(),
@@ -758,6 +762,7 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_
       return Status::OK();
     }
 
+    deleted_row_count_ = old_dms->deleted_row_count();
     redo_delta_stores_.push_back(old_dms);
   }
 
@@ -789,6 +794,7 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_
     CHECK_EQ(redo_delta_stores_[idx], old_dms)
         << "Another thread modified the delta store list during flush";
     redo_delta_stores_[idx] = dfr;
+    deleted_row_count_ = 0;
   }
 
   return Status::OK();
@@ -862,6 +868,12 @@ Status DeltaTracker::InitAllDeltaStoresForTests(WhichStores stores) {
   return Status::OK();
 }
 
+int64_t DeltaTracker::CountDeletedRows() const {
+  shared_lock<rw_spinlock> lock(component_lock_);
+  DCHECK_GE(deleted_row_count_, 0);
+  return deleted_row_count_ + dms_->deleted_row_count();
+}
+
 string DeltaTracker::LogPrefix() const {
   return Substitute("T $0 P $1: ",
                     rowset_metadata_->tablet_metadata()->tablet_id(),
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index 36aa59e..f1dabb5 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -266,6 +266,10 @@ class DeltaTracker {
   // Init() all of the specified delta stores. For tests only.
   Status InitAllDeltaStoresForTests(WhichStores stores);
 
+  // Count the number of deleted rows in the current DMS as well as
+  // in a flushing DMS (if one exists)
+  int64_t CountDeletedRows() const;
+
  private:
   FRIEND_TEST(TestRowSet, TestRowSetUpdate);
   FRIEND_TEST(TestRowSet, TestDMSFlush);
@@ -363,6 +367,11 @@ class DeltaTracker {
   // TODO(perf): this needs to be more fine grained
   mutable Mutex compact_flush_lock_;
 
+  // Number of deleted rows for a DMS that is currently being flushed.
+  // When the flush completes, this is merged into the RowSetMetadata
+  // and reset.
+  int64_t deleted_row_count_;
+
   DISALLOW_COPY_AND_ASSIGN(DeltaTracker);
 };
 
diff --git a/src/kudu/tablet/deltamemstore-test.cc b/src/kudu/tablet/deltamemstore-test.cc
index ba7adfe..5ea72a6 100644
--- a/src/kudu/tablet/deltamemstore-test.cc
+++ b/src/kudu/tablet/deltamemstore-test.cc
@@ -669,5 +669,27 @@ TEST_F(TestDeltaMemStore, TestFuzz) {
       /*test_filter_column_ids_and_collect_deltas=*/false));
 }
 
+TEST_F(TestDeltaMemStore, TestDeletedRowCount) {
+  const int kNumUpdates = 10000;
+
+  faststring buf;
+  RowChangeListEncoder update(&buf);
+  for (rowid_t row_idx = 0; row_idx < kNumUpdates; row_idx++) {
+    // UPDATE.
+    uint32_t new_val = row_idx;
+    update.Reset();
+    update.AddColumnUpdate(schema_.column(kIntColumn), schema_.column_id(kIntColumn), &new_val);
+    ASSERT_OK(dms_->Update(Timestamp(row_idx), row_idx, RowChangeList(buf), op_id_));
+
+    // DELETE.
+    if (row_idx % 2 == 0) {
+      update.Reset();
+      update.SetToDelete();
+      ASSERT_OK(dms_->Update(Timestamp(row_idx + 1), row_idx, RowChangeList(buf), op_id_));
+    }
+  }
+  ASSERT_EQ(kNumUpdates / 2, dms_->deleted_row_count());
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index 6f65810..c5d2009 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -80,7 +80,8 @@ DeltaMemStore::DeltaMemStore(int64_t id,
     tree_(arena_),
     anchorer_(log_anchor_registry,
               Substitute("Rowset-$0/DeltaMemStore-$1", rs_id_, id_)),
-    disambiguator_sequence_number_(0) {
+    disambiguator_sequence_number_(0),
+    deleted_row_count_(0) {
 }
 
 Status DeltaMemStore::Init(const IOContext* /*io_context*/) {
@@ -118,6 +119,10 @@ Status DeltaMemStore::Update(Timestamp timestamp,
 
   anchorer_.AnchorIfMinimum(op_id.index());
 
+  if (update.is_delete()) {
+    deleted_row_count_.Increment();
+  }
+
   return Status::OK();
 }
 
@@ -191,6 +196,12 @@ void DeltaMemStore::DebugPrint() const {
   tree_.DebugPrint();
 }
 
+int64_t DeltaMemStore::deleted_row_count() const {
+  int64_t count = deleted_row_count_.Load();
+  DCHECK_GE(count, 0);
+  return count;
+}
+
 ////////////////////////////////////////////////////////////
 // DMSIterator
 ////////////////////////////////////////////////////////////
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index b741d85..01b942f 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -147,6 +147,9 @@ class DeltaMemStore : public DeltaStore,
     return delta_stats_;
   }
 
+  // Returns the number of deleted rows in this DMS.
+  int64_t deleted_row_count() const;
+
  private:
   friend class DMSIterator;
 
@@ -181,6 +184,9 @@ class DeltaMemStore : public DeltaStore,
   // number, and is only used in the case that such a collision occurs.
   AtomicInt<Atomic32> disambiguator_sequence_number_;
 
+  // Number of deleted rows in this DMS.
+  AtomicInt<int64_t> deleted_row_count_;
+
   DISALLOW_COPY_AND_ASSIGN(DeltaMemStore);
 };
 
diff --git a/src/kudu/tablet/diskrowset-test.cc b/src/kudu/tablet/diskrowset-test.cc
index 8f98be4..9155e4b 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -511,15 +511,18 @@ TEST_F(TestRowSet, TestRollingDiskRowSetWriter) {
   RollingDiskRowSetWriter writer(tablet()->metadata(), schema_,
                                  BloomFilterSizing::BySizeAndFPRate(32*1024, 0.01f),
                                  64 * 1024); // roll every 64KB
-  DoWriteTestRowSet(10000, &writer);
+  DoWriteTestRowSet(FLAGS_roundtrip_num_rows, &writer);
 
   // Should have rolled 4 times.
   vector<shared_ptr<RowSetMetadata> > metas;
   writer.GetWrittenRowSetMetadata(&metas);
   EXPECT_EQ(4, metas.size());
+  int64_t count = 0;
   for (const shared_ptr<RowSetMetadata>& meta : metas) {
     ASSERT_TRUE(meta->HasDataForColumnIdForTests(schema_.column_id(0)));
+    count += meta->live_row_count();
   }
+  ASSERT_EQ(FLAGS_roundtrip_num_rows, count);
 }
 
 TEST_F(TestRowSet, TestMakeDeltaIteratorMergerUnlocked) {
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index d6972cc..8143fc3 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -183,7 +183,7 @@ Status DiskRowSetWriter::InitAdHocIndexWriter() {
 
 }
 
-Status DiskRowSetWriter::AppendBlock(const RowBlock &block) {
+Status DiskRowSetWriter::AppendBlock(const RowBlock &block, int live_row_count) {
   DCHECK_EQ(block.schema()->num_columns(), schema_->num_columns());
   CHECK(!finished_);
 
@@ -201,6 +201,9 @@ Status DiskRowSetWriter::AppendBlock(const RowBlock &block) {
   // Write the batch to each of the columns
   RETURN_NOT_OK(col_writer_->AppendBlock(block));
 
+  // Increase the live row count if necessary.
+  rowset_metadata_->IncrementLiveRows(live_row_count);
+
 #ifndef NDEBUG
     faststring prev_key;
 #endif
@@ -383,9 +386,9 @@ Status RollingDiskRowSetWriter::RollIfNecessary() {
   return Status::OK();
 }
 
-Status RollingDiskRowSetWriter::AppendBlock(const RowBlock &block) {
+Status RollingDiskRowSetWriter::AppendBlock(const RowBlock &block, int live_row_count) {
   DCHECK_EQ(state_, kStarted);
-  RETURN_NOT_OK(cur_writer_->AppendBlock(block));
+  RETURN_NOT_OK(cur_writer_->AppendBlock(block, live_row_count));
 
   written_count_ += block.nrows();
 
@@ -470,7 +473,6 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
     }
 
     written_size_ += cur_writer_->written_size();
-
     written_drs_metas_.push_back(cur_drs_metadata_);
   }
 
@@ -479,7 +481,6 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
   cur_redo_writer_.reset(nullptr);
 
   cur_drs_metadata_.reset();
-
   return Status::OK();
 }
 
@@ -753,6 +754,12 @@ Status DiskRowSet::CountRows(const IOContext* io_context, rowid_t *count) const
   return Status::OK();
 }
 
+Status DiskRowSet::CountLiveRows(int64_t* count) const {
+  *count = rowset_metadata_->live_row_count() - delta_tracker_->CountDeletedRows();
+  DCHECK_GE(*count, 0);
+  return Status::OK();
+}
+
 Status DiskRowSet::GetBounds(std::string* min_encoded_key,
                              std::string* max_encoded_key) const {
   DCHECK(open_);
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 65ed8b3..391d82d 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -100,7 +100,8 @@ class DiskRowSetWriter {
   // The block is written to all column writers as well as the bloom filter,
   // if configured.
   // Rows must be appended in ascending order.
-  Status AppendBlock(const RowBlock &block);
+  // 'live_row_count' means the number of live rows in this input block.
+  Status AppendBlock(const RowBlock &block, int live_row_count = 0);
 
   // Closes the CFiles and their underlying writable blocks.
   // If no rows were written, returns Status::Aborted().
@@ -180,7 +181,8 @@ class RollingDiskRowSetWriter {
   // you must append deltas using the APIs below *before* appending the block
   // of rows that they correspond to. This ensures that the output delta files
   // and data files are aligned.
-  Status AppendBlock(const RowBlock &block);
+  // 'live_row_count' means the number of live rows in this input block.
+  Status AppendBlock(const RowBlock &block, int live_row_count = 0);
 
   // Appends a sequence of REDO deltas for the same row to the current
   // redo delta file. 'row_idx_in_next_block' is the positional index after
@@ -367,6 +369,9 @@ class DiskRowSet : public RowSet {
   // yet set, consults the base data and stores the result in 'num_rows_'.
   Status CountRows(const fs::IOContext* io_context, rowid_t *count) const final override;
 
+  // Count the number of live rows in this DRS.
+  virtual Status CountLiveRows(int64_t* count) const override;
+
   // See RowSet::GetBounds(...)
   virtual Status GetBounds(std::string* min_encoded_key,
                            std::string* max_encoded_key) const override;
diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc
index 1ddc8c9..f32002f 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -817,5 +817,36 @@ TEST_F(TestMemRowSet, TestMemRowSetUpdatePerformance) {
   }
 }
 
+TEST_F(TestMemRowSet, TestCountLiveRows) {
+  shared_ptr<MemRowSet> mrs;
+  ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
+                              MemTracker::GetRootTracker(), &mrs));
+
+  const auto CheckLiveRowsCount = [&](int64_t expect) {
+    int64_t count = 0;
+    ASSERT_OK(mrs->CountLiveRows(&count));
+    ASSERT_EQ(expect, count);
+  };
+
+  NO_FATALS(CheckLiveRowsCount(0));
+  ASSERT_OK(GenerateTestData(mrs.get()));
+  NO_FATALS(CheckLiveRowsCount(4));
+
+  ASSERT_OK(InsertRow(mrs.get(), "liverow 0", 0));
+  NO_FATALS(CheckLiveRowsCount(5));
+  ASSERT_OK(InsertRow(mrs.get(), "liverow 1", 0));
+  NO_FATALS(CheckLiveRowsCount(6));
+
+  OperationResultPB result;
+  ASSERT_OK(DeleteRow(mrs.get(), "liverow 0", &result));
+  NO_FATALS(CheckLiveRowsCount(5));
+
+  ASSERT_OK(InsertRow(mrs.get(), "liverow 0", 0));
+  NO_FATALS(CheckLiveRowsCount(6));
+
+  ASSERT_OK(UpdateRow(mrs.get(), "liverow 0", 1, &result));
+  NO_FATALS(CheckLiveRowsCount(6));
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 71904cd..a08d506 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -116,7 +116,8 @@ MemRowSet::MemRowSet(int64_t id,
     debug_insert_count_(0),
     debug_update_count_(0),
     anchorer_(log_anchor_registry, Substitute("MemRowSet-$0", id_)),
-    has_been_compacted_(false) {
+    has_been_compacted_(false),
+    live_row_count_(0) {
   CHECK(schema.has_column_ids());
   ANNOTATE_BENIGN_RACE(&debug_insert_count_, "insert count isnt accurate");
   ANNOTATE_BENIGN_RACE(&debug_update_count_, "update count isnt accurate");
@@ -189,6 +190,7 @@ Status MemRowSet::Insert(Timestamp timestamp,
   anchorer_.AnchorIfMinimum(op_id.index());
 
   debug_insert_count_++;
+  live_row_count_.Increment();
   return Status::OK();
 }
 
@@ -208,6 +210,8 @@ Status MemRowSet::Reinsert(Timestamp timestamp, const ConstContiguousRow& row, M
   // for the mutation are fully published before any concurrent reader sees
   // the appended mutation.
   mut->AppendToListAtomic(&ms_row->header_->redo_head, &ms_row->header_->redo_tail);
+
+  live_row_count_.Increment();
   return Status::OK();
 }
 
@@ -251,6 +255,9 @@ Status MemRowSet::MutateRow(Timestamp timestamp,
 
   anchorer_.AnchorIfMinimum(op_id.index());
   debug_update_count_++;
+  if (delta.is_delete()) {
+    live_row_count_.IncrementBy(-1);
+  }
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index bd57a14..752140d 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -40,6 +40,7 @@
 #include "kudu/tablet/concurrent_btree.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/rowset_metadata.h"
+#include "kudu/util/atomic.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/monotime.h"
@@ -254,6 +255,12 @@ class MemRowSet : public RowSet,
     return Status::OK();
   }
 
+  virtual Status CountLiveRows(int64_t* count) const override {
+    *count = live_row_count_.Load();
+    DCHECK_GE(*count, 0);
+    return Status::OK();
+  }
+
   virtual Status GetBounds(std::string *min_encoded_key,
                            std::string *max_encoded_key) const override;
 
@@ -451,6 +458,9 @@ class MemRowSet : public RowSet,
   // and thus should not be scheduled for further compactions.
   std::atomic<bool> has_been_compacted_;
 
+  // Number of live rows in this MRS.
+  AtomicInt<int64_t> live_row_count_;
+
   DISALLOW_COPY_AND_ASSIGN(MemRowSet);
 };
 
diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto
index 8905249..57ff067 100644
--- a/src/kudu/tablet/metadata.proto
+++ b/src/kudu/tablet/metadata.proto
@@ -47,6 +47,9 @@ message RowSetDataPB {
   optional BlockIdPB adhoc_index_block = 7;
   optional bytes min_encoded_key = 8;
   optional bytes max_encoded_key = 9;
+
+  // Number of live rows that have been persisted.
+  optional int64 live_row_count = 10;
 }
 
 // State flags indicating whether the tablet is in the middle of being copied
@@ -131,6 +134,11 @@ message TabletSuperBlockPB {
   // from a version of Kudu before 1.5.0. In this case, a new group will be
   // created spanning all data directories.
   optional DataDirGroupPB data_dir_group = 15;
+
+  // Whether the tablet supports counting live row. If false, 'live_row_count'
+  // may be inaccurate and should be ignored. It's only supported for the newly
+  // created ones, not for the ancient ones.
+  optional bool supports_live_row_count = 16;
 }
 
 // Tablet states represent stages of a TabletReplica's object lifecycle and are
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index 01576d5..c22d44d 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -70,6 +70,10 @@ class MockRowSet : public RowSet {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
+  virtual Status CountLiveRows(int64_t* /*count*/) const OVERRIDE {
+    LOG(FATAL) << "Unimplemented";
+    return Status::OK();
+  }
   virtual std::string ToString() const OVERRIDE {
     LOG(FATAL) << "Unimplemented";
     return "";
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index 78cb199..62c1131 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -230,6 +230,15 @@ Status DuplicatingRowSet::CountRows(const IOContext* io_context, rowid_t *count)
   return Status::OK();
 }
 
+Status DuplicatingRowSet::CountLiveRows(int64_t* count) const {
+  for (const shared_ptr<RowSet>& rs : old_rowsets_) {
+    int64_t tmp = 0;
+    RETURN_NOT_OK(rs->CountLiveRows(&tmp));
+    *count += tmp;
+  }
+  return Status::OK();
+}
+
 Status DuplicatingRowSet::GetBounds(string* min_encoded_key,
                                     string* max_encoded_key) const {
   // The range out of the output rowset always spans the full range
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index baecb4a..2cfccaf 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -163,6 +163,9 @@ class RowSet {
   // Count the number of rows in this rowset.
   virtual Status CountRows(const fs::IOContext* io_context, rowid_t *count) const = 0;
 
+  // Count the number of live rows in this rowset.
+  virtual Status CountLiveRows(int64_t* count) const = 0;
+
   // Return the bounds for this RowSet. 'min_encoded_key' and 'max_encoded_key'
   // are set to the first and last encoded keys for this RowSet.
   //
@@ -406,6 +409,8 @@ class DuplicatingRowSet : public RowSet {
 
   Status CountRows(const fs::IOContext* io_context, rowid_t *count) const OVERRIDE;
 
+  virtual Status CountLiveRows(int64_t* count) const OVERRIDE;
+
   virtual Status GetBounds(std::string* min_encoded_key,
                            std::string* max_encoded_key) const OVERRIDE;
 
diff --git a/src/kudu/tablet/rowset_metadata.cc b/src/kudu/tablet/rowset_metadata.cc
index 440ef93..6bd19ce 100644
--- a/src/kudu/tablet/rowset_metadata.cc
+++ b/src/kudu/tablet/rowset_metadata.cc
@@ -118,6 +118,11 @@ void RowSetMetadata::LoadFromPB(const RowSetDataPB& pb) {
   for (const DeltaDataPB& undo_delta_pb : pb.undo_deltas()) {
     undo_delta_blocks_.push_back(BlockId::FromPB(undo_delta_pb.block()));
   }
+
+  // Load live row count.
+  if (tablet_metadata_->supports_live_row_count()) {
+    live_row_count_ = pb.live_row_count();
+  }
 }
 
 void RowSetMetadata::ToProtobuf(RowSetDataPB *pb) {
@@ -163,6 +168,11 @@ void RowSetMetadata::ToProtobuf(RowSetDataPB *pb) {
     pb->set_min_encoded_key(*min_encoded_key_);
     pb->set_max_encoded_key(*max_encoded_key_);
   }
+
+  // Write the live row count.
+  if (tablet_metadata_->supports_live_row_count()) {
+    pb->set_live_row_count(live_row_count_);
+  }
 }
 
 const std::string RowSetMetadata::ToString() const {
@@ -268,6 +278,20 @@ void RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update,
   blocks_by_col_id_.shrink_to_fit();
 }
 
+void RowSetMetadata::IncrementLiveRows(int64_t row_count) {
+  if (tablet_metadata_->supports_live_row_count() && row_count != 0) {
+    std::lock_guard<LockType> l(lock_);
+    live_row_count_ += row_count;
+    DCHECK_GE(live_row_count_, 0);
+  }
+}
+
+int64_t RowSetMetadata::live_row_count() const {
+  std::lock_guard<LockType> l(lock_);
+  DCHECK_GE(live_row_count_, 0);
+  return live_row_count_;
+}
+
 vector<BlockId> RowSetMetadata::GetAllBlocks() {
   vector<BlockId> blocks;
   std::lock_guard<LockType> l(lock_);
diff --git a/src/kudu/tablet/rowset_metadata.h b/src/kudu/tablet/rowset_metadata.h
index c0d156e..94af6f3 100644
--- a/src/kudu/tablet/rowset_metadata.h
+++ b/src/kudu/tablet/rowset_metadata.h
@@ -224,6 +224,15 @@ class RowSetMetadata {
 
   std::vector<BlockId> GetAllBlocks();
 
+  // Increase the row count.
+  // Note:
+  //   1) A positive number means plus the live rows,
+  //   2) A negative number means minus the deleted rows.
+  void IncrementLiveRows(int64_t row_count);
+
+  // Returns the number of live rows in this metadata.
+  int64_t live_row_count() const;
+
  private:
   friend class TabletMetadata;
 
@@ -232,7 +241,8 @@ class RowSetMetadata {
   explicit RowSetMetadata(TabletMetadata *tablet_metadata)
     : tablet_metadata_(tablet_metadata),
       initted_(false),
-      last_durable_redo_dms_id_(kNoDurableMemStore) {
+      last_durable_redo_dms_id_(kNoDurableMemStore),
+      live_row_count_(0) {
   }
 
   RowSetMetadata(TabletMetadata *tablet_metadata,
@@ -240,7 +250,8 @@ class RowSetMetadata {
     : tablet_metadata_(DCHECK_NOTNULL(tablet_metadata)),
       initted_(true),
       id_(id),
-      last_durable_redo_dms_id_(kNoDurableMemStore) {
+      last_durable_redo_dms_id_(kNoDurableMemStore),
+      live_row_count_(0) {
   }
 
   Status InitFromPB(const RowSetDataPB& pb);
@@ -266,6 +277,9 @@ class RowSetMetadata {
 
   int64_t last_durable_redo_dms_id_;
 
+  // Number of live rows on disk, excluding those in [MRS/DMS].
+  int64_t live_row_count_;
+
   DISALLOW_COPY_AND_ASSIGN(RowSetMetadata);
 };
 
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 267a18c..62167d5 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -336,7 +336,7 @@ static Status WriteRow(const Slice &row_slice, RowSetWriterClass *writer) {
   RowBlockRow dst_row = block.row(0);
   RETURN_NOT_OK(CopyRow(row, &dst_row, static_cast<Arena*>(nullptr)));
 
-  return writer->AppendBlock(block);
+  return writer->AppendBlock(block, 1);
 }
 
 // Tracks encoded deltas and provides a DeltaIterator-like interface for
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index 4ffa081..3f46522 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -91,13 +91,18 @@ template<class SETUP>
 class TestTablet : public TabletTestBase<SETUP> {
   typedef SETUP Type;
 
- public:
+public:
   // Verify that iteration doesn't fail
   void CheckCanIterate() {
     vector<string> out_rows;
     ASSERT_OK(this->IterateToStringList(&out_rows));
   }
 
+  void CheckLiveRowsCount(int64_t expect) {
+    int64_t count = 0;
+    ASSERT_OK(this->tablet()->CountLiveRows(&count));
+    ASSERT_EQ(expect, count);
+  }
 };
 TYPED_TEST_CASE(TestTablet, TabletTestHelperTypes);
 
@@ -111,6 +116,7 @@ TYPED_TEST(TestTablet, TestFlush) {
   ASSERT_EQ(0, this->tablet()->OnDiskDataSize());
   ASSERT_GT(this->tablet()->OnDiskSize(), 0);
   ASSERT_EQ(this->tablet()->metadata()->on_disk_size(), this->tablet()->OnDiskSize());
+  NO_FATALS(this->CheckLiveRowsCount(max_rows));
 
   // Flush it.
   ASSERT_OK(this->tablet()->Flush());
@@ -120,6 +126,7 @@ TYPED_TEST(TestTablet, TestFlush) {
   // on-disk data size due to per-diskrowset metadata and tablet metadata.
   ASSERT_GT(this->tablet()->OnDiskDataSize(), 0);
   ASSERT_GT(this->tablet()->OnDiskSize(), this->tablet()->OnDiskDataSize());
+  NO_FATALS(this->CheckLiveRowsCount(max_rows));
 
   // Make sure the files were created as expected.
   RowSetMetadata* rowset_meta = tablet_meta->GetRowSetForTests(0);
@@ -210,6 +217,7 @@ TYPED_TEST(TestTablet, TestInsertsAndMutationsAreUndoneWithMVCCAfterFlush) {
   // now flush and the compact everything
   ASSERT_OK(this->tablet()->Flush());
   ASSERT_OK(this->tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+  NO_FATALS(this->CheckLiveRowsCount(4));
 
   // Now verify that with undos and redos we get the same thing.
   VerifySnapshotsHaveSameResult(this->tablet().get(), this->client_schema_,
@@ -244,6 +252,8 @@ TYPED_TEST(TestTablet, TestGhostRowsOnDiskRowSets) {
 
   // Should still be able to update, since the row is live.
   ASSERT_OK(this->UpdateTestRow(&writer, 0, 1));
+
+  NO_FATALS(this->CheckLiveRowsCount(1));
 }
 
 // Test that inserting a row which already exists causes an AlreadyPresent
@@ -259,15 +269,18 @@ TYPED_TEST(TestTablet, TestInsertDuplicateKey) {
   ASSERT_STR_CONTAINS(s.ToString(), "key already present");
 
   ASSERT_EQ(1, this->TabletCount());
+  NO_FATALS(this->CheckLiveRowsCount(1));
 
   // Flush, and make sure that inserting duplicate still fails
   ASSERT_OK(this->tablet()->Flush());
 
   ASSERT_EQ(1, this->TabletCount());
+  NO_FATALS(this->CheckLiveRowsCount(1));
 
   s = this->InsertTestRow(&writer, 12345, 0);
   ASSERT_STR_CONTAINS(s.ToString(), "key already present");
   ASSERT_EQ(1, this->TabletCount());
+  NO_FATALS(this->CheckLiveRowsCount(1));
 }
 
 // Tests that we are able to handle reinserts properly.
@@ -330,7 +343,7 @@ TYPED_TEST(TestTablet, TestReinserts) {
   ASSERT_STR_CONTAINS((*expected_rows[3])[0], "int32 key_idx=1, int32 val=1)");
   ASSERT_EQ(expected_rows[4]->size(), 0) << "Got the wrong result from snap: "
                                          << snaps[4].ToString();
-
+  NO_FATALS(this->CheckLiveRowsCount(0));
   STLDeleteElements(&expected_rows);
 }
 
@@ -372,6 +385,7 @@ TYPED_TEST(TestTablet, TestDeleteWithFlushAndCompact) {
   ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).dms_id());
   ASSERT_OK(this->IterateToStringList(&rows));
   ASSERT_EQ(0, rows.size());
+  NO_FATALS(this->CheckLiveRowsCount(0));
 
   // We now have an INSERT in the MemRowSet and the
   // deleted row in the DiskRowSet. The new version
@@ -392,6 +406,7 @@ TYPED_TEST(TestTablet, TestDeleteWithFlushAndCompact) {
   ASSERT_OK(this->IterateToStringList(&rows));
   ASSERT_EQ(1, rows.size());
   EXPECT_EQ(this->setup_.FormatDebugRow(0, 2, false), rows[0]);
+  NO_FATALS(this->CheckLiveRowsCount(1));
 }
 
 // Test flushes dealing with REINSERT mutations in the MemRowSet.
@@ -708,6 +723,7 @@ TYPED_TEST(TestTablet, TestInsertsPersist) {
 
   this->InsertTestRows(0, max_rows, 0);
   ASSERT_EQ(max_rows, this->TabletCount());
+  NO_FATALS(this->CheckLiveRowsCount(max_rows));
 
   // Get current timestamp.
   Timestamp t = this->tablet()->clock()->Now();
@@ -716,17 +732,19 @@ TYPED_TEST(TestTablet, TestInsertsPersist) {
   ASSERT_OK(this->tablet()->Flush());
 
   ASSERT_EQ(max_rows, this->TabletCount());
+  NO_FATALS(this->CheckLiveRowsCount(max_rows));
 
   // Close and re-open tablet.
-  // TODO: Should we be reopening the tablet in a different way to persist the
+  // TODO(unknown): Should we be reopening the tablet in a different way to persist the
   // clock / timestamps?
   this->TabletReOpen();
 
   // Ensure that rows exist
   ASSERT_EQ(max_rows, this->TabletCount());
+  NO_FATALS(this->CheckLiveRowsCount(max_rows));
   this->VerifyTestRowsWithTimestampAndVerifier(0, max_rows, t, boost::none);
 
-  // TODO: add some more data, re-flush
+  // TODO(unknown): add some more data, re-flush
 }
 
 TYPED_TEST(TestTablet, TestUpsert) {
@@ -754,6 +772,7 @@ TYPED_TEST(TestTablet, TestUpsert) {
   EXPECT_EQ(2, upserts_as_updates->value());
   ASSERT_OK(this->IterateToStringList(&rows));
   EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1002, false) }, rows);
+  NO_FATALS(this->CheckLiveRowsCount(1));
 }
 
 
@@ -866,6 +885,7 @@ TYPED_TEST(TestTablet, TestCompaction) {
     // Compaction does not swap the memrowsets so we should still get 3
     ASSERT_EQ(3, this->tablet()->CurrentMrsIdForTests());
     ASSERT_EQ(n_rows * 3, this->TabletCount());
+    NO_FATALS(this->CheckLiveRowsCount(n_rows * 3));
 
     const RowSetMetadata *rowset_meta = this->tablet()->metadata()->GetRowSetForTests(3);
     ASSERT_TRUE(rowset_meta != nullptr);
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index daee9a5..9d5909a 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1908,6 +1908,25 @@ Status Tablet::CountRows(uint64_t *count) const {
   return Status::OK();
 }
 
+Status Tablet::CountLiveRows(int64_t* count) const {
+  if (!metadata_->supports_live_row_count()) {
+    return Status::NotSupported("This tablet doesn't support live row counting");
+  }
+
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+
+  int64_t ret = 0;
+  int64_t tmp = 0;
+  RETURN_NOT_OK(comps->memrowset->CountLiveRows(&ret));
+  for (const shared_ptr<RowSet>& rowset : comps->rowsets->all_rowsets()) {
+    RETURN_NOT_OK(rowset->CountLiveRows(&tmp));
+    ret += tmp;
+  }
+  *count = ret;
+  return Status::OK();
+}
+
 size_t Tablet::MemRowSetSize() const {
   scoped_refptr<TabletComponents> comps;
   GetComponents(&comps);
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index c4a5691..8235daf 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -347,6 +347,9 @@ class Tablet {
   // memrowset in the current implementation.
   Status CountRows(uint64_t *count) const;
 
+  // Count the number of live rows in this tablet.
+  Status CountLiveRows(int64_t* count) const;
+
   // Verbosely dump this entire tablet to the logs. This is only
   // really useful when debugging unit tests failures where the tablet
   // has a very small number of rows.
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 7c536f7..ec0d628 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -90,6 +90,7 @@ Status TabletMetadata::CreateNew(FsManager* fs_manager,
                                  const Partition& partition,
                                  const TabletDataState& initial_tablet_data_state,
                                  boost::optional<OpId> tombstone_last_logged_opid,
+                                 bool supports_live_row_count,
                                  scoped_refptr<TabletMetadata>* metadata) {
 
   // Verify that no existing tablet exists with the same ID.
@@ -110,7 +111,8 @@ Status TabletMetadata::CreateNew(FsManager* fs_manager,
                                                        partition_schema,
                                                        partition,
                                                        initial_tablet_data_state,
-                                                       std::move(tombstone_last_logged_opid)));
+                                                       std::move(tombstone_last_logged_opid),
+                                                       supports_live_row_count));
   RETURN_NOT_OK(ret->Flush());
   dir_group_cleanup.cancel();
 
@@ -149,7 +151,9 @@ Status TabletMetadata::LoadOrCreate(FsManager* fs_manager,
   if (s.IsNotFound()) {
     return CreateNew(fs_manager, tablet_id, table_name, table_id, schema,
                      partition_schema, partition, initial_tablet_data_state,
-                     std::move(tombstone_last_logged_opid), metadata);
+                     std::move(tombstone_last_logged_opid),
+                     /*supports_live_row_count=*/ true,
+                     metadata);
   }
   return s;
 }
@@ -266,7 +270,8 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id,
                                const Schema& schema, PartitionSchema partition_schema,
                                Partition partition,
                                const TabletDataState& tablet_data_state,
-                               boost::optional<OpId> tombstone_last_logged_opid)
+                               boost::optional<OpId> tombstone_last_logged_opid,
+                               bool supports_live_row_count)
     : state_(kNotWrittenYet),
       tablet_id_(std::move(tablet_id)),
       table_id_(std::move(table_id)),
@@ -283,7 +288,8 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id,
       num_flush_pins_(0),
       needs_flush_(false),
       flush_count_for_tests_(0),
-      pre_flush_callback_(Bind(DoNothingStatusClosure)) {
+      pre_flush_callback_(Bind(DoNothingStatusClosure)),
+      supports_live_row_count_(supports_live_row_count) {
   CHECK(schema_->has_column_ids());
   CHECK_GT(schema_->num_key_columns(), 0);
 }
@@ -302,7 +308,8 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id)
       num_flush_pins_(0),
       needs_flush_(false),
       flush_count_for_tests_(0),
-      pre_flush_callback_(Bind(DoNothingStatusClosure)) {}
+      pre_flush_callback_(Bind(DoNothingStatusClosure)),
+      supports_live_row_count_(false) {}
 
 Status TabletMetadata::LoadFromDisk() {
   TRACE_EVENT1("tablet", "TabletMetadata::LoadFromDisk",
@@ -386,6 +393,9 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock)
 
     tablet_data_state_ = superblock.tablet_data_state();
 
+    // This field should be parsed before parsing RowSetDataPB.
+    supports_live_row_count_ = superblock.supports_live_row_count();
+
     rowsets_.clear();
     for (const RowSetDataPB& rowset_pb : superblock.rowsets()) {
       unique_ptr<RowSetMetadata> rowset_meta;
@@ -677,6 +687,8 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
     pb.mutable_data_dir_group()->Swap(&group_pb);
   }
 
+  pb.set_supports_live_row_count(supports_live_row_count_);
+
   super_block->Swap(&pb);
   return Status::OK();
 }
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index adf0d4c..1462950 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -81,6 +81,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
                           const Partition& partition,
                           const TabletDataState& initial_tablet_data_state,
                           boost::optional<consensus::OpId> tombstone_last_logged_opid,
+                          bool supports_live_row_count,
                           scoped_refptr<TabletMetadata>* metadata);
 
   // Load existing metadata from disk.
@@ -263,6 +264,15 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
     return flush_count_for_tests_;
   }
 
+  // For testing only.
+  void set_supports_live_row_count_for_tests(bool supports_live_row_count) {
+    supports_live_row_count_ = supports_live_row_count;
+  }
+
+  bool supports_live_row_count() const {
+    return supports_live_row_count_;
+  }
+
  private:
   friend class RefCountedThreadSafe<TabletMetadata>;
   friend class MetadataTest;
@@ -279,7 +289,8 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
                  const Schema& schema, PartitionSchema partition_schema,
                  Partition partition,
                  const TabletDataState& tablet_data_state,
-                 boost::optional<consensus::OpId> tombstone_last_logged_opid);
+                 boost::optional<consensus::OpId> tombstone_last_logged_opid,
+                 bool supports_live_row_count);
 
   // Constructor for loading an existing tablet.
   TabletMetadata(FsManager* fs_manager, std::string tablet_id);
@@ -387,6 +398,9 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
   // call to Flush() or LoadFromDisk().
   std::atomic<int64_t> on_disk_size_;
 
+  // The tablet supports live row counting if true.
+  bool supports_live_row_count_;
+
   DISALLOW_COPY_AND_ASSIGN(TabletMetadata);
 };
 
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 471ef56..65829e9 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -1677,6 +1677,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) {
       kSchemaWithIds, partition.first, partition.second,
       tablet::TABLET_DATA_READY,
       /*tombstone_last_logged_opid=*/ boost::none,
+      /*supports_live_row_count=*/ true,
       &meta));
   string stdout;
   NO_FATALS(RunActionStdoutString(Substitute("local_replica dump data_dirs $0 "
@@ -1711,6 +1712,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpMeta) {
                   kSchemaWithIds, partition.first, partition.second,
                   tablet::TABLET_DATA_READY,
                   /*tombstone_last_logged_opid=*/ boost::none,
+                  /*supports_live_row_count=*/ true,
                   &meta);
   string stdout;
   NO_FATALS(RunActionStdoutString(Substitute("local_replica dump meta $0 "
diff --git a/src/kudu/tserver/tablet_copy-test-base.h b/src/kudu/tserver/tablet_copy-test-base.h
index b4db99c..7319a6c 100644
--- a/src/kudu/tserver/tablet_copy-test-base.h
+++ b/src/kudu/tserver/tablet_copy-test-base.h
@@ -110,7 +110,7 @@ class TabletCopyTest : public TabletServerTestBase {
 
   // Generate the test data for the tablet and do the flushing we assume will be
   // done in the unit tests for tablet copy.
-  void GenerateTestData() {
+  virtual void GenerateTestData() {
     const int kIncr = 50;
     LOG_TIMING(INFO, "Loading test data") {
       for (int row_id = 0; row_id < kNumLogRolls * kIncr; row_id += kIncr) {
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
index 8c880ce..5dd9f4e 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -14,7 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#include "kudu/tserver/tablet_copy-test-base.h"
+#include "kudu/tserver/tablet_copy_client.h"
 
 #include <cstdint>
 #include <limits>
@@ -53,8 +53,8 @@
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/tablet_copy-test-base.h"
 #include "kudu/tserver/tablet_copy.pb.h"
-#include "kudu/tserver/tablet_copy_client.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
@@ -63,9 +63,11 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/path_util.h"
+#include "kudu/util/random.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
 
 using std::shared_ptr;
 using std::string;
@@ -143,6 +145,14 @@ class TabletCopyClientTest : public TabletCopyTest {
  protected:
   Status CompareFileContents(const string& path1, const string& path2);
 
+  // Injection of 'supports_live_row_count' modifiers through polymorphic characteristic.
+  virtual void GenerateTestData() {
+    Random rand(SeedRandom());
+    NO_FATALS(tablet_replica_->tablet_metadata()->
+        set_supports_live_row_count_for_tests(rand.Next() % 2));
+    NO_FATALS(TabletCopyTest::GenerateTestData());
+  }
+
   MetricRegistry metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
   unique_ptr<FsManager> fs_manager_;
@@ -401,6 +411,12 @@ TEST_F(TabletCopyClientTest, TestFailedDiskStopsClient) {
   ASSERT_TRUE(s.IsIOError());
 }
 
+TEST_F(TabletCopyClientTest, TestSupportsLiveRowCount) {
+  ASSERT_OK(StartCopy());
+  ASSERT_EQ(meta_->supports_live_row_count(),
+      tablet_replica_->tablet_metadata()->supports_live_row_count());
+}
+
 enum DownloadBlocks {
   kDownloadBlocks,    // Fetch blocks from remote.
   kNoDownloadBlocks,  // Do not fetch blocks from remote.
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index 6c3669c..f4fbe48 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -360,6 +360,7 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr,
                                             partition,
                                             superblock_->tablet_data_state(),
                                             superblock_->tombstone_last_logged_opid(),
+                                            remote_superblock_->supports_live_row_count(),
                                             &meta_));
     TRACE("Wrote new tablet metadata");
 
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 9e83b50..9531f09 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -442,6 +442,7 @@ Status TSTabletManager::CreateNewTablet(const string& table_id,
                               partition,
                               TABLET_DATA_READY,
                               boost::none,
+                              /*supports_live_row_count=*/ true,
                               &meta),
     "Couldn't create tablet metadata");