You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/06/06 02:14:15 UTC
[kudu] branch master updated: [tablet] Support accurate count of
rows
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new c1c8e25 [tablet] Support accurate count of rows
c1c8e25 is described below
commit c1c8e25bbc2e783446f1dd257cd99a16a79ccb16
Author: helifu <hz...@corp.netease.com>
AuthorDate: Wed Jun 5 19:58:31 2019 +0800
[tablet] Support accurate count of rows
A tablet is consisted of one MRS and a group of DRS. And the
group of DRS makes up a RowSetTree. Thus, the number of live
rows in a tablet comes from MRS and RowSetTree. At the same
time, a new capability is added to the tablet superblock to
ensure that only newly created tablets enable the capability.
1. At the beginning, new rows will be written (insert/delete/
reinsert) into MRS and counter1 holds the number of rows;
2. Next, MRS is being flushed to DRS (flush1):
When MRS is being flushed, it will be attached to the RowSetTree,
so we can count rows there (still counter1);
When MRS is flushed to DRS, new RowSetMetadatas will be created
and every one has persistent counter2;
3. Then, there will be updates to DRS and they will be accumulated
in DMS. counter3 holds the number of rows;
4. If DMS is flushed (flush2):
When DMS is being flushed, it will be attached to the Redo list,
and its counter3 will be swapped to DeltaTracker;
When DMS is flushed to Redo, the value of counter3 will be added
to the counter2 of RowSetMetadata;
5. MinorCompact:
No influence since any insertions or deletions come from memory;
6. MajorCompact:
No influence, just like step5;
7. Compact:
Just like step2.
__counter2(persistent)
/
|
__RowSetMetadata __counter3
/ /
flush1 | |
[MRS] -------> [ DRS ... ] -- Undo + Redo + DMS
| ^ |
\__ |______|
counter1 flush2
Change-Id: I2e6378e289bb85024c29e96c2b153fc417ed6412
Reviewed-on: http://gerrit.cloudera.org:8080/13456
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
---
src/kudu/master/sys_catalog.cc | 1 +
src/kudu/tablet/compaction-test.cc | 80 +++++++++++++++++++++++++++--
src/kudu/tablet/compaction.cc | 11 +++-
src/kudu/tablet/delta_tracker.cc | 14 ++++-
src/kudu/tablet/delta_tracker.h | 9 ++++
src/kudu/tablet/deltamemstore-test.cc | 22 ++++++++
src/kudu/tablet/deltamemstore.cc | 13 ++++-
src/kudu/tablet/deltamemstore.h | 6 +++
src/kudu/tablet/diskrowset-test.cc | 5 +-
src/kudu/tablet/diskrowset.cc | 17 ++++--
src/kudu/tablet/diskrowset.h | 9 +++-
src/kudu/tablet/memrowset-test.cc | 31 +++++++++++
src/kudu/tablet/memrowset.cc | 9 +++-
src/kudu/tablet/memrowset.h | 10 ++++
src/kudu/tablet/metadata.proto | 8 +++
src/kudu/tablet/mock-rowsets.h | 4 ++
src/kudu/tablet/rowset.cc | 9 ++++
src/kudu/tablet/rowset.h | 5 ++
src/kudu/tablet/rowset_metadata.cc | 24 +++++++++
src/kudu/tablet/rowset_metadata.h | 18 ++++++-
src/kudu/tablet/tablet-test-util.h | 2 +-
src/kudu/tablet/tablet-test.cc | 28 ++++++++--
src/kudu/tablet/tablet.cc | 19 +++++++
src/kudu/tablet/tablet.h | 3 ++
src/kudu/tablet/tablet_metadata.cc | 22 ++++++--
src/kudu/tablet/tablet_metadata.h | 16 +++++-
src/kudu/tools/kudu-tool-test.cc | 2 +
src/kudu/tserver/tablet_copy-test-base.h | 2 +-
src/kudu/tserver/tablet_copy_client-test.cc | 20 +++++++-
src/kudu/tserver/tablet_copy_client.cc | 1 +
src/kudu/tserver/ts_tablet_manager.cc | 1 +
31 files changed, 388 insertions(+), 33 deletions(-)
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index e90531f..a1410ae 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -252,6 +252,7 @@ Status SysCatalogTable::CreateNew(FsManager *fs_manager) {
partitions[0],
tablet::TABLET_DATA_READY,
/*tombstone_last_logged_opid=*/ boost::none,
+ /*supports_live_row_count=*/ true,
&metadata));
RaftConfigPB config;
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index e5bafea..fd0006d 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include "kudu/tablet/compaction.h"
+
#include <algorithm>
#include <cstdint>
#include <cstdio>
@@ -52,7 +54,6 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/compaction.h"
#include "kudu/tablet/diskrowset.h"
#include "kudu/tablet/local_tablet_writer.h"
#include "kudu/tablet/memrowset.h"
@@ -232,10 +233,13 @@ class TestCompaction : public KuduRowSetTest {
}
void DeleteRows(RowSet* rowset, int n_rows) {
- faststring update_buf;
+ DeleteRows(rowset, n_rows, 0);
+ }
+
+ void DeleteRows(RowSet* rowset, int n_rows, int delta) {
for (uint32_t i = 0; i < n_rows; i++) {
SCOPED_TRACE(i);
- DeleteRow(rowset, i * 10);
+ DeleteRow(rowset, i * 10 + delta);
}
}
@@ -614,7 +618,7 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
shared_ptr<DiskRowSet> rs3;
{
shared_ptr<MemRowSet> mrs;
- ASSERT_OK(MemRowSet::Create(1, schema_, log_anchor_registry_.get(),
+ ASSERT_OK(MemRowSet::Create(2, schema_, log_anchor_registry_.get(),
mem_trackers_.tablet_tracker, &mrs));
InsertRows(mrs.get(), 10, 0);
UpdateRows(mrs.get(), 10, 0, 2);
@@ -1183,5 +1187,73 @@ TEST_F(TestCompaction, TestEmptyFlushDoesntLeakBlocks) {
ASSERT_EQ(after_block_ids, before_block_ids);
}
+TEST_F(TestCompaction, TestCountLiveRowsOfMemRowSetFlush) {
+ shared_ptr<MemRowSet> mrs;
+ ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
+ mem_trackers_.tablet_tracker, &mrs));
+ NO_FATALS(InsertRows(mrs.get(), 100, 0));
+ NO_FATALS(UpdateRows(mrs.get(), 80, 0, 1));
+ NO_FATALS(DeleteRows(mrs.get(), 50));
+ NO_FATALS(InsertRows(mrs.get(), 10, 0));
+ int64_t count = 0;
+ ASSERT_OK(mrs->CountLiveRows(&count));
+ ASSERT_EQ(100 - 50 + 10, count);
+
+ shared_ptr<DiskRowSet> rs;
+ NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs));
+ ASSERT_OK(rs->CountLiveRows(&count));
+ ASSERT_EQ(100 - 50 + 10, count);
+}
+
+TEST_F(TestCompaction, TestCountLiveRowsOfDiskRowSetsCompact) {
+ shared_ptr<DiskRowSet> rs1;
+ {
+ shared_ptr<MemRowSet> mrs;
+ ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
+ mem_trackers_.tablet_tracker, &mrs));
+ NO_FATALS(InsertRows(mrs.get(), 100, 0));
+ NO_FATALS(UpdateRows(mrs.get(), 80, 0, 1));
+ NO_FATALS(DeleteRows(mrs.get(), 50, 0));
+ NO_FATALS(InsertRows(mrs.get(), 10, 0));
+ NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs1));
+ }
+ shared_ptr<DiskRowSet> rs2;
+ {
+ shared_ptr<MemRowSet> mrs;
+ ASSERT_OK(MemRowSet::Create(1, schema_, log_anchor_registry_.get(),
+ mem_trackers_.tablet_tracker, &mrs));
+ NO_FATALS(InsertRows(mrs.get(), 100, 1));
+ NO_FATALS(UpdateRows(mrs.get(), 80, 1, 1));
+ NO_FATALS(DeleteRows(mrs.get(), 50, 1));
+ NO_FATALS(InsertRows(mrs.get(), 10, 1));
+ NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs2));
+ }
+ shared_ptr<DiskRowSet> rs3;
+ {
+ shared_ptr<MemRowSet> mrs;
+ ASSERT_OK(MemRowSet::Create(2, schema_, log_anchor_registry_.get(),
+ mem_trackers_.tablet_tracker, &mrs));
+ NO_FATALS(InsertRows(mrs.get(), 100, 2));
+ NO_FATALS(UpdateRows(mrs.get(), 80, 2, 2));
+ NO_FATALS(DeleteRows(mrs.get(), 50, 2));
+ NO_FATALS(InsertRows(mrs.get(), 10, 2));
+ NO_FATALS(FlushMRSAndReopenNoRoll(*mrs, schema_, &rs3));
+ }
+
+ shared_ptr<DiskRowSet> result;
+ vector<shared_ptr<DiskRowSet>> all_rss;
+ all_rss.emplace_back(std::move(rs3));
+ all_rss.emplace_back(std::move(rs1));
+ all_rss.emplace_back(std::move(rs2));
+
+ SeedRandom();
+ std::random_shuffle(all_rss.begin(), all_rss.end());
+ NO_FATALS(CompactAndReopenNoRoll(all_rss, schema_, &result));
+
+ int64_t count = 0;
+ ASSERT_OK(result->CountLiveRows(&count));
+ ASSERT_EQ((100 - 50 + 10) * 3, count);
+}
+
} // namespace tablet
} // namespace kudu
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index a076302..b760776 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -1107,6 +1107,7 @@ Status FlushCompactionInput(CompactionInput* input,
RETURN_NOT_OK(input->PrepareBlock(&rows));
int n = 0;
+ int live_row_count = 0;
for (int i = 0; i < rows.size(); i++) {
CompactionInputRow* input_row = &rows[i];
RETURN_NOT_OK(out->RollIfNecessary());
@@ -1162,19 +1163,25 @@ Status FlushCompactionInput(CompactionInput* input,
out->AppendRedoDeltas(dst_row.row_index(), new_redos_head, &index_in_current_drs);
}
+ // If the REDO is empty, it should not be a DELETE.
+ if (new_redos_head == nullptr) {
+ live_row_count++;
+ }
+
DVLOG(4) << "Output Row: " << dst_row.schema()->DebugRow(dst_row)
<< "; RowId: " << index_in_current_drs;
n++;
if (n == block.nrows()) {
- RETURN_NOT_OK(out->AppendBlock(block));
+ RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
+ live_row_count = 0;
n = 0;
}
}
if (n > 0) {
block.Resize(n);
- RETURN_NOT_OK(out->AppendBlock(block));
+ RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
block.Resize(block.row_capacity());
}
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index dd3c788..9400e32 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -95,7 +95,8 @@ DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata,
read_only_(false),
log_anchor_registry_(log_anchor_registry),
mem_trackers_(std::move(mem_trackers)),
- dms_empty_(true) {}
+ dms_empty_(true),
+ deleted_row_count_(0) {}
Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
const IOContext* io_context,
@@ -717,6 +718,9 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
dfr));
VLOG_WITH_PREFIX(1) << "Opened new delta block " << block_id.ToString() << " for read";
+ // Merge the deleted row count of the old DMS to the RowSetMetadata if necessary.
+ rowset_metadata_->IncrementLiveRows(-deleted_row_count_);
+
RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(), block_id));
if (flush_type == FLUSH_METADATA) {
RETURN_NOT_OK_PREPEND(rowset_metadata_->Flush(),
@@ -758,6 +762,7 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_
return Status::OK();
}
+ deleted_row_count_ = old_dms->deleted_row_count();
redo_delta_stores_.push_back(old_dms);
}
@@ -789,6 +794,7 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_
CHECK_EQ(redo_delta_stores_[idx], old_dms)
<< "Another thread modified the delta store list during flush";
redo_delta_stores_[idx] = dfr;
+ deleted_row_count_ = 0;
}
return Status::OK();
@@ -862,6 +868,12 @@ Status DeltaTracker::InitAllDeltaStoresForTests(WhichStores stores) {
return Status::OK();
}
+int64_t DeltaTracker::CountDeletedRows() const {
+ shared_lock<rw_spinlock> lock(component_lock_);
+ DCHECK_GE(deleted_row_count_, 0);
+ return deleted_row_count_ + dms_->deleted_row_count();
+}
+
string DeltaTracker::LogPrefix() const {
return Substitute("T $0 P $1: ",
rowset_metadata_->tablet_metadata()->tablet_id(),
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index 36aa59e..f1dabb5 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -266,6 +266,10 @@ class DeltaTracker {
// Init() all of the specified delta stores. For tests only.
Status InitAllDeltaStoresForTests(WhichStores stores);
+ // Count the number of deleted rows in the current DMS as well as
+ // in a flushing DMS (if one exists)
+ int64_t CountDeletedRows() const;
+
private:
FRIEND_TEST(TestRowSet, TestRowSetUpdate);
FRIEND_TEST(TestRowSet, TestDMSFlush);
@@ -363,6 +367,11 @@ class DeltaTracker {
// TODO(perf): this needs to be more fine grained
mutable Mutex compact_flush_lock_;
+ // Number of deleted rows for a DMS that is currently being flushed.
+ // When the flush completes, this is merged into the RowSetMetadata
+ // and reset.
+ int64_t deleted_row_count_;
+
DISALLOW_COPY_AND_ASSIGN(DeltaTracker);
};
diff --git a/src/kudu/tablet/deltamemstore-test.cc b/src/kudu/tablet/deltamemstore-test.cc
index ba7adfe..5ea72a6 100644
--- a/src/kudu/tablet/deltamemstore-test.cc
+++ b/src/kudu/tablet/deltamemstore-test.cc
@@ -669,5 +669,27 @@ TEST_F(TestDeltaMemStore, TestFuzz) {
/*test_filter_column_ids_and_collect_deltas=*/false));
}
+TEST_F(TestDeltaMemStore, TestDeletedRowCount) {
+ const int kNumUpdates = 10000;
+
+ faststring buf;
+ RowChangeListEncoder update(&buf);
+ for (rowid_t row_idx = 0; row_idx < kNumUpdates; row_idx++) {
+ // UPDATE.
+ uint32_t new_val = row_idx;
+ update.Reset();
+ update.AddColumnUpdate(schema_.column(kIntColumn), schema_.column_id(kIntColumn), &new_val);
+ ASSERT_OK(dms_->Update(Timestamp(row_idx), row_idx, RowChangeList(buf), op_id_));
+
+ // DELETE.
+ if (row_idx % 2 == 0) {
+ update.Reset();
+ update.SetToDelete();
+ ASSERT_OK(dms_->Update(Timestamp(row_idx + 1), row_idx, RowChangeList(buf), op_id_));
+ }
+ }
+ ASSERT_EQ(kNumUpdates / 2, dms_->deleted_row_count());
+}
+
} // namespace tablet
} // namespace kudu
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index 6f65810..c5d2009 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -80,7 +80,8 @@ DeltaMemStore::DeltaMemStore(int64_t id,
tree_(arena_),
anchorer_(log_anchor_registry,
Substitute("Rowset-$0/DeltaMemStore-$1", rs_id_, id_)),
- disambiguator_sequence_number_(0) {
+ disambiguator_sequence_number_(0),
+ deleted_row_count_(0) {
}
Status DeltaMemStore::Init(const IOContext* /*io_context*/) {
@@ -118,6 +119,10 @@ Status DeltaMemStore::Update(Timestamp timestamp,
anchorer_.AnchorIfMinimum(op_id.index());
+ if (update.is_delete()) {
+ deleted_row_count_.Increment();
+ }
+
return Status::OK();
}
@@ -191,6 +196,12 @@ void DeltaMemStore::DebugPrint() const {
tree_.DebugPrint();
}
+int64_t DeltaMemStore::deleted_row_count() const {
+ int64_t count = deleted_row_count_.Load();
+ DCHECK_GE(count, 0);
+ return count;
+}
+
////////////////////////////////////////////////////////////
// DMSIterator
////////////////////////////////////////////////////////////
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index b741d85..01b942f 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -147,6 +147,9 @@ class DeltaMemStore : public DeltaStore,
return delta_stats_;
}
+ // Returns the number of deleted rows in this DMS.
+ int64_t deleted_row_count() const;
+
private:
friend class DMSIterator;
@@ -181,6 +184,9 @@ class DeltaMemStore : public DeltaStore,
// number, and is only used in the case that such a collision occurs.
AtomicInt<Atomic32> disambiguator_sequence_number_;
+ // Number of deleted rows in this DMS.
+ AtomicInt<int64_t> deleted_row_count_;
+
DISALLOW_COPY_AND_ASSIGN(DeltaMemStore);
};
diff --git a/src/kudu/tablet/diskrowset-test.cc b/src/kudu/tablet/diskrowset-test.cc
index 8f98be4..9155e4b 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -511,15 +511,18 @@ TEST_F(TestRowSet, TestRollingDiskRowSetWriter) {
RollingDiskRowSetWriter writer(tablet()->metadata(), schema_,
BloomFilterSizing::BySizeAndFPRate(32*1024, 0.01f),
64 * 1024); // roll every 64KB
- DoWriteTestRowSet(10000, &writer);
+ DoWriteTestRowSet(FLAGS_roundtrip_num_rows, &writer);
// Should have rolled 4 times.
vector<shared_ptr<RowSetMetadata> > metas;
writer.GetWrittenRowSetMetadata(&metas);
EXPECT_EQ(4, metas.size());
+ int64_t count = 0;
for (const shared_ptr<RowSetMetadata>& meta : metas) {
ASSERT_TRUE(meta->HasDataForColumnIdForTests(schema_.column_id(0)));
+ count += meta->live_row_count();
}
+ ASSERT_EQ(FLAGS_roundtrip_num_rows, count);
}
TEST_F(TestRowSet, TestMakeDeltaIteratorMergerUnlocked) {
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index d6972cc..8143fc3 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -183,7 +183,7 @@ Status DiskRowSetWriter::InitAdHocIndexWriter() {
}
-Status DiskRowSetWriter::AppendBlock(const RowBlock &block) {
+Status DiskRowSetWriter::AppendBlock(const RowBlock &block, int live_row_count) {
DCHECK_EQ(block.schema()->num_columns(), schema_->num_columns());
CHECK(!finished_);
@@ -201,6 +201,9 @@ Status DiskRowSetWriter::AppendBlock(const RowBlock &block) {
// Write the batch to each of the columns
RETURN_NOT_OK(col_writer_->AppendBlock(block));
+ // Increase the live row count if necessary.
+ rowset_metadata_->IncrementLiveRows(live_row_count);
+
#ifndef NDEBUG
faststring prev_key;
#endif
@@ -383,9 +386,9 @@ Status RollingDiskRowSetWriter::RollIfNecessary() {
return Status::OK();
}
-Status RollingDiskRowSetWriter::AppendBlock(const RowBlock &block) {
+Status RollingDiskRowSetWriter::AppendBlock(const RowBlock &block, int live_row_count) {
DCHECK_EQ(state_, kStarted);
- RETURN_NOT_OK(cur_writer_->AppendBlock(block));
+ RETURN_NOT_OK(cur_writer_->AppendBlock(block, live_row_count));
written_count_ += block.nrows();
@@ -470,7 +473,6 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
}
written_size_ += cur_writer_->written_size();
-
written_drs_metas_.push_back(cur_drs_metadata_);
}
@@ -479,7 +481,6 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
cur_redo_writer_.reset(nullptr);
cur_drs_metadata_.reset();
-
return Status::OK();
}
@@ -753,6 +754,12 @@ Status DiskRowSet::CountRows(const IOContext* io_context, rowid_t *count) const
return Status::OK();
}
+Status DiskRowSet::CountLiveRows(int64_t* count) const {
+ *count = rowset_metadata_->live_row_count() - delta_tracker_->CountDeletedRows();
+ DCHECK_GE(*count, 0);
+ return Status::OK();
+}
+
Status DiskRowSet::GetBounds(std::string* min_encoded_key,
std::string* max_encoded_key) const {
DCHECK(open_);
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 65ed8b3..391d82d 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -100,7 +100,8 @@ class DiskRowSetWriter {
// The block is written to all column writers as well as the bloom filter,
// if configured.
// Rows must be appended in ascending order.
- Status AppendBlock(const RowBlock &block);
+ // 'live_row_count' means the number of live rows in this input block.
+ Status AppendBlock(const RowBlock &block, int live_row_count = 0);
// Closes the CFiles and their underlying writable blocks.
// If no rows were written, returns Status::Aborted().
@@ -180,7 +181,8 @@ class RollingDiskRowSetWriter {
// you must append deltas using the APIs below *before* appending the block
// of rows that they correspond to. This ensures that the output delta files
// and data files are aligned.
- Status AppendBlock(const RowBlock &block);
+ // 'live_row_count' means the number of live rows in this input block.
+ Status AppendBlock(const RowBlock &block, int live_row_count = 0);
// Appends a sequence of REDO deltas for the same row to the current
// redo delta file. 'row_idx_in_next_block' is the positional index after
@@ -367,6 +369,9 @@ class DiskRowSet : public RowSet {
// yet set, consults the base data and stores the result in 'num_rows_'.
Status CountRows(const fs::IOContext* io_context, rowid_t *count) const final override;
+ // Count the number of live rows in this DRS.
+ virtual Status CountLiveRows(int64_t* count) const override;
+
// See RowSet::GetBounds(...)
virtual Status GetBounds(std::string* min_encoded_key,
std::string* max_encoded_key) const override;
diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc
index 1ddc8c9..f32002f 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -817,5 +817,36 @@ TEST_F(TestMemRowSet, TestMemRowSetUpdatePerformance) {
}
}
+TEST_F(TestMemRowSet, TestCountLiveRows) {
+ shared_ptr<MemRowSet> mrs;
+ ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
+ MemTracker::GetRootTracker(), &mrs));
+
+ const auto CheckLiveRowsCount = [&](int64_t expect) {
+ int64_t count = 0;
+ ASSERT_OK(mrs->CountLiveRows(&count));
+ ASSERT_EQ(expect, count);
+ };
+
+ NO_FATALS(CheckLiveRowsCount(0));
+ ASSERT_OK(GenerateTestData(mrs.get()));
+ NO_FATALS(CheckLiveRowsCount(4));
+
+ ASSERT_OK(InsertRow(mrs.get(), "liverow 0", 0));
+ NO_FATALS(CheckLiveRowsCount(5));
+ ASSERT_OK(InsertRow(mrs.get(), "liverow 1", 0));
+ NO_FATALS(CheckLiveRowsCount(6));
+
+ OperationResultPB result;
+ ASSERT_OK(DeleteRow(mrs.get(), "liverow 0", &result));
+ NO_FATALS(CheckLiveRowsCount(5));
+
+ ASSERT_OK(InsertRow(mrs.get(), "liverow 0", 0));
+ NO_FATALS(CheckLiveRowsCount(6));
+
+ ASSERT_OK(UpdateRow(mrs.get(), "liverow 0", 1, &result));
+ NO_FATALS(CheckLiveRowsCount(6));
+}
+
} // namespace tablet
} // namespace kudu
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 71904cd..a08d506 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -116,7 +116,8 @@ MemRowSet::MemRowSet(int64_t id,
debug_insert_count_(0),
debug_update_count_(0),
anchorer_(log_anchor_registry, Substitute("MemRowSet-$0", id_)),
- has_been_compacted_(false) {
+ has_been_compacted_(false),
+ live_row_count_(0) {
CHECK(schema.has_column_ids());
ANNOTATE_BENIGN_RACE(&debug_insert_count_, "insert count isnt accurate");
ANNOTATE_BENIGN_RACE(&debug_update_count_, "update count isnt accurate");
@@ -189,6 +190,7 @@ Status MemRowSet::Insert(Timestamp timestamp,
anchorer_.AnchorIfMinimum(op_id.index());
debug_insert_count_++;
+ live_row_count_.Increment();
return Status::OK();
}
@@ -208,6 +210,8 @@ Status MemRowSet::Reinsert(Timestamp timestamp, const ConstContiguousRow& row, M
// for the mutation are fully published before any concurrent reader sees
// the appended mutation.
mut->AppendToListAtomic(&ms_row->header_->redo_head, &ms_row->header_->redo_tail);
+
+ live_row_count_.Increment();
return Status::OK();
}
@@ -251,6 +255,9 @@ Status MemRowSet::MutateRow(Timestamp timestamp,
anchorer_.AnchorIfMinimum(op_id.index());
debug_update_count_++;
+ if (delta.is_delete()) {
+ live_row_count_.IncrementBy(-1);
+ }
return Status::OK();
}
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index bd57a14..752140d 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -40,6 +40,7 @@
#include "kudu/tablet/concurrent_btree.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/rowset_metadata.h"
+#include "kudu/util/atomic.h"
#include "kudu/util/faststring.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
@@ -254,6 +255,12 @@ class MemRowSet : public RowSet,
return Status::OK();
}
+ virtual Status CountLiveRows(int64_t* count) const override {
+ *count = live_row_count_.Load();
+ DCHECK_GE(*count, 0);
+ return Status::OK();
+ }
+
virtual Status GetBounds(std::string *min_encoded_key,
std::string *max_encoded_key) const override;
@@ -451,6 +458,9 @@ class MemRowSet : public RowSet,
// and thus should not be scheduled for further compactions.
std::atomic<bool> has_been_compacted_;
+ // Number of live rows in this MRS.
+ AtomicInt<int64_t> live_row_count_;
+
DISALLOW_COPY_AND_ASSIGN(MemRowSet);
};
diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto
index 8905249..57ff067 100644
--- a/src/kudu/tablet/metadata.proto
+++ b/src/kudu/tablet/metadata.proto
@@ -47,6 +47,9 @@ message RowSetDataPB {
optional BlockIdPB adhoc_index_block = 7;
optional bytes min_encoded_key = 8;
optional bytes max_encoded_key = 9;
+
+ // Number of live rows that have been persisted.
+ optional int64 live_row_count = 10;
}
// State flags indicating whether the tablet is in the middle of being copied
@@ -131,6 +134,11 @@ message TabletSuperBlockPB {
// from a version of Kudu before 1.5.0. In this case, a new group will be
// created spanning all data directories.
optional DataDirGroupPB data_dir_group = 15;
+
+ // Whether the tablet supports counting live row. If false, 'live_row_count'
+ // may be inaccurate and should be ignored. It's only supported for the newly
+ // created ones, not for the ancient ones.
+ optional bool supports_live_row_count = 16;
}
// Tablet states represent stages of a TabletReplica's object lifecycle and are
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index 01576d5..c22d44d 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -70,6 +70,10 @@ class MockRowSet : public RowSet {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
+ virtual Status CountLiveRows(int64_t* /*count*/) const OVERRIDE {
+ LOG(FATAL) << "Unimplemented";
+ return Status::OK();
+ }
virtual std::string ToString() const OVERRIDE {
LOG(FATAL) << "Unimplemented";
return "";
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index 78cb199..62c1131 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -230,6 +230,15 @@ Status DuplicatingRowSet::CountRows(const IOContext* io_context, rowid_t *count)
return Status::OK();
}
+Status DuplicatingRowSet::CountLiveRows(int64_t* count) const {
+ for (const shared_ptr<RowSet>& rs : old_rowsets_) {
+ int64_t tmp = 0;
+ RETURN_NOT_OK(rs->CountLiveRows(&tmp));
+ *count += tmp;
+ }
+ return Status::OK();
+}
+
Status DuplicatingRowSet::GetBounds(string* min_encoded_key,
string* max_encoded_key) const {
// The range out of the output rowset always spans the full range
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index baecb4a..2cfccaf 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -163,6 +163,9 @@ class RowSet {
// Count the number of rows in this rowset.
virtual Status CountRows(const fs::IOContext* io_context, rowid_t *count) const = 0;
+ // Count the number of live rows in this rowset.
+ virtual Status CountLiveRows(int64_t* count) const = 0;
+
// Return the bounds for this RowSet. 'min_encoded_key' and 'max_encoded_key'
// are set to the first and last encoded keys for this RowSet.
//
@@ -406,6 +409,8 @@ class DuplicatingRowSet : public RowSet {
Status CountRows(const fs::IOContext* io_context, rowid_t *count) const OVERRIDE;
+ virtual Status CountLiveRows(int64_t* count) const OVERRIDE;
+
virtual Status GetBounds(std::string* min_encoded_key,
std::string* max_encoded_key) const OVERRIDE;
diff --git a/src/kudu/tablet/rowset_metadata.cc b/src/kudu/tablet/rowset_metadata.cc
index 440ef93..6bd19ce 100644
--- a/src/kudu/tablet/rowset_metadata.cc
+++ b/src/kudu/tablet/rowset_metadata.cc
@@ -118,6 +118,11 @@ void RowSetMetadata::LoadFromPB(const RowSetDataPB& pb) {
for (const DeltaDataPB& undo_delta_pb : pb.undo_deltas()) {
undo_delta_blocks_.push_back(BlockId::FromPB(undo_delta_pb.block()));
}
+
+ // Load live row count.
+ if (tablet_metadata_->supports_live_row_count()) {
+ live_row_count_ = pb.live_row_count();
+ }
}
void RowSetMetadata::ToProtobuf(RowSetDataPB *pb) {
@@ -163,6 +168,11 @@ void RowSetMetadata::ToProtobuf(RowSetDataPB *pb) {
pb->set_min_encoded_key(*min_encoded_key_);
pb->set_max_encoded_key(*max_encoded_key_);
}
+
+ // Write the live row count.
+ if (tablet_metadata_->supports_live_row_count()) {
+ pb->set_live_row_count(live_row_count_);
+ }
}
const std::string RowSetMetadata::ToString() const {
@@ -268,6 +278,20 @@ void RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update,
blocks_by_col_id_.shrink_to_fit();
}
+void RowSetMetadata::IncrementLiveRows(int64_t row_count) {
+ if (tablet_metadata_->supports_live_row_count() && row_count != 0) {
+ std::lock_guard<LockType> l(lock_);
+ live_row_count_ += row_count;
+ DCHECK_GE(live_row_count_, 0);
+ }
+}
+
+int64_t RowSetMetadata::live_row_count() const {
+ std::lock_guard<LockType> l(lock_);
+ DCHECK_GE(live_row_count_, 0);
+ return live_row_count_;
+}
+
vector<BlockId> RowSetMetadata::GetAllBlocks() {
vector<BlockId> blocks;
std::lock_guard<LockType> l(lock_);
diff --git a/src/kudu/tablet/rowset_metadata.h b/src/kudu/tablet/rowset_metadata.h
index c0d156e..94af6f3 100644
--- a/src/kudu/tablet/rowset_metadata.h
+++ b/src/kudu/tablet/rowset_metadata.h
@@ -224,6 +224,15 @@ class RowSetMetadata {
std::vector<BlockId> GetAllBlocks();
+ // Increase the row count.
+ // Note:
+ // 1) A positive number means plus the live rows,
+ // 2) A negative number means minus the deleted rows.
+ void IncrementLiveRows(int64_t row_count);
+
+ // Returns the number of live rows in this metadata.
+ int64_t live_row_count() const;
+
private:
friend class TabletMetadata;
@@ -232,7 +241,8 @@ class RowSetMetadata {
explicit RowSetMetadata(TabletMetadata *tablet_metadata)
: tablet_metadata_(tablet_metadata),
initted_(false),
- last_durable_redo_dms_id_(kNoDurableMemStore) {
+ last_durable_redo_dms_id_(kNoDurableMemStore),
+ live_row_count_(0) {
}
RowSetMetadata(TabletMetadata *tablet_metadata,
@@ -240,7 +250,8 @@ class RowSetMetadata {
: tablet_metadata_(DCHECK_NOTNULL(tablet_metadata)),
initted_(true),
id_(id),
- last_durable_redo_dms_id_(kNoDurableMemStore) {
+ last_durable_redo_dms_id_(kNoDurableMemStore),
+ live_row_count_(0) {
}
Status InitFromPB(const RowSetDataPB& pb);
@@ -266,6 +277,9 @@ class RowSetMetadata {
int64_t last_durable_redo_dms_id_;
+ // Number of live rows on disk, excluding those in [MRS/DMS].
+ int64_t live_row_count_;
+
DISALLOW_COPY_AND_ASSIGN(RowSetMetadata);
};
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 267a18c..62167d5 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -336,7 +336,7 @@ static Status WriteRow(const Slice &row_slice, RowSetWriterClass *writer) {
RowBlockRow dst_row = block.row(0);
RETURN_NOT_OK(CopyRow(row, &dst_row, static_cast<Arena*>(nullptr)));
- return writer->AppendBlock(block);
+ return writer->AppendBlock(block, 1);
}
// Tracks encoded deltas and provides a DeltaIterator-like interface for
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index 4ffa081..3f46522 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -91,13 +91,18 @@ template<class SETUP>
class TestTablet : public TabletTestBase<SETUP> {
typedef SETUP Type;
- public:
+public:
// Verify that iteration doesn't fail
void CheckCanIterate() {
vector<string> out_rows;
ASSERT_OK(this->IterateToStringList(&out_rows));
}
+ void CheckLiveRowsCount(int64_t expect) {
+ int64_t count = 0;
+ ASSERT_OK(this->tablet()->CountLiveRows(&count));
+ ASSERT_EQ(expect, count);
+ }
};
TYPED_TEST_CASE(TestTablet, TabletTestHelperTypes);
@@ -111,6 +116,7 @@ TYPED_TEST(TestTablet, TestFlush) {
ASSERT_EQ(0, this->tablet()->OnDiskDataSize());
ASSERT_GT(this->tablet()->OnDiskSize(), 0);
ASSERT_EQ(this->tablet()->metadata()->on_disk_size(), this->tablet()->OnDiskSize());
+ NO_FATALS(this->CheckLiveRowsCount(max_rows));
// Flush it.
ASSERT_OK(this->tablet()->Flush());
@@ -120,6 +126,7 @@ TYPED_TEST(TestTablet, TestFlush) {
// on-disk data size due to per-diskrowset metadata and tablet metadata.
ASSERT_GT(this->tablet()->OnDiskDataSize(), 0);
ASSERT_GT(this->tablet()->OnDiskSize(), this->tablet()->OnDiskDataSize());
+ NO_FATALS(this->CheckLiveRowsCount(max_rows));
// Make sure the files were created as expected.
RowSetMetadata* rowset_meta = tablet_meta->GetRowSetForTests(0);
@@ -210,6 +217,7 @@ TYPED_TEST(TestTablet, TestInsertsAndMutationsAreUndoneWithMVCCAfterFlush) {
// now flush and the compact everything
ASSERT_OK(this->tablet()->Flush());
ASSERT_OK(this->tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+ NO_FATALS(this->CheckLiveRowsCount(4));
// Now verify that with undos and redos we get the same thing.
VerifySnapshotsHaveSameResult(this->tablet().get(), this->client_schema_,
@@ -244,6 +252,8 @@ TYPED_TEST(TestTablet, TestGhostRowsOnDiskRowSets) {
// Should still be able to update, since the row is live.
ASSERT_OK(this->UpdateTestRow(&writer, 0, 1));
+
+ NO_FATALS(this->CheckLiveRowsCount(1));
}
// Test that inserting a row which already exists causes an AlreadyPresent
@@ -259,15 +269,18 @@ TYPED_TEST(TestTablet, TestInsertDuplicateKey) {
ASSERT_STR_CONTAINS(s.ToString(), "key already present");
ASSERT_EQ(1, this->TabletCount());
+ NO_FATALS(this->CheckLiveRowsCount(1));
// Flush, and make sure that inserting duplicate still fails
ASSERT_OK(this->tablet()->Flush());
ASSERT_EQ(1, this->TabletCount());
+ NO_FATALS(this->CheckLiveRowsCount(1));
s = this->InsertTestRow(&writer, 12345, 0);
ASSERT_STR_CONTAINS(s.ToString(), "key already present");
ASSERT_EQ(1, this->TabletCount());
+ NO_FATALS(this->CheckLiveRowsCount(1));
}
// Tests that we are able to handle reinserts properly.
@@ -330,7 +343,7 @@ TYPED_TEST(TestTablet, TestReinserts) {
ASSERT_STR_CONTAINS((*expected_rows[3])[0], "int32 key_idx=1, int32 val=1)");
ASSERT_EQ(expected_rows[4]->size(), 0) << "Got the wrong result from snap: "
<< snaps[4].ToString();
-
+ NO_FATALS(this->CheckLiveRowsCount(0));
STLDeleteElements(&expected_rows);
}
@@ -372,6 +385,7 @@ TYPED_TEST(TestTablet, TestDeleteWithFlushAndCompact) {
ASSERT_EQ(0L, writer.last_op_result().mutated_stores(0).dms_id());
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(0, rows.size());
+ NO_FATALS(this->CheckLiveRowsCount(0));
// We now have an INSERT in the MemRowSet and the
// deleted row in the DiskRowSet. The new version
@@ -392,6 +406,7 @@ TYPED_TEST(TestTablet, TestDeleteWithFlushAndCompact) {
ASSERT_OK(this->IterateToStringList(&rows));
ASSERT_EQ(1, rows.size());
EXPECT_EQ(this->setup_.FormatDebugRow(0, 2, false), rows[0]);
+ NO_FATALS(this->CheckLiveRowsCount(1));
}
// Test flushes dealing with REINSERT mutations in the MemRowSet.
@@ -708,6 +723,7 @@ TYPED_TEST(TestTablet, TestInsertsPersist) {
this->InsertTestRows(0, max_rows, 0);
ASSERT_EQ(max_rows, this->TabletCount());
+ NO_FATALS(this->CheckLiveRowsCount(max_rows));
// Get current timestamp.
Timestamp t = this->tablet()->clock()->Now();
@@ -716,17 +732,19 @@ TYPED_TEST(TestTablet, TestInsertsPersist) {
ASSERT_OK(this->tablet()->Flush());
ASSERT_EQ(max_rows, this->TabletCount());
+ NO_FATALS(this->CheckLiveRowsCount(max_rows));
// Close and re-open tablet.
- // TODO: Should we be reopening the tablet in a different way to persist the
+ // TODO(unknown): Should we be reopening the tablet in a different way to persist the
// clock / timestamps?
this->TabletReOpen();
// Ensure that rows exist
ASSERT_EQ(max_rows, this->TabletCount());
+ NO_FATALS(this->CheckLiveRowsCount(max_rows));
this->VerifyTestRowsWithTimestampAndVerifier(0, max_rows, t, boost::none);
- // TODO: add some more data, re-flush
+ // TODO(unknown): add some more data, re-flush
}
TYPED_TEST(TestTablet, TestUpsert) {
@@ -754,6 +772,7 @@ TYPED_TEST(TestTablet, TestUpsert) {
EXPECT_EQ(2, upserts_as_updates->value());
ASSERT_OK(this->IterateToStringList(&rows));
EXPECT_EQ(vector<string>{ this->setup_.FormatDebugRow(0, 1002, false) }, rows);
+ NO_FATALS(this->CheckLiveRowsCount(1));
}
@@ -866,6 +885,7 @@ TYPED_TEST(TestTablet, TestCompaction) {
// Compaction does not swap the memrowsets so we should still get 3
ASSERT_EQ(3, this->tablet()->CurrentMrsIdForTests());
ASSERT_EQ(n_rows * 3, this->TabletCount());
+ NO_FATALS(this->CheckLiveRowsCount(n_rows * 3));
const RowSetMetadata *rowset_meta = this->tablet()->metadata()->GetRowSetForTests(3);
ASSERT_TRUE(rowset_meta != nullptr);
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index daee9a5..9d5909a 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1908,6 +1908,25 @@ Status Tablet::CountRows(uint64_t *count) const {
return Status::OK();
}
+Status Tablet::CountLiveRows(int64_t* count) const {
+ if (!metadata_->supports_live_row_count()) {
+ return Status::NotSupported("This tablet doesn't support live row counting");
+ }
+
+ scoped_refptr<TabletComponents> comps;
+ GetComponents(&comps);
+
+ int64_t ret = 0;
+ int64_t tmp = 0;
+ RETURN_NOT_OK(comps->memrowset->CountLiveRows(&ret));
+ for (const shared_ptr<RowSet>& rowset : comps->rowsets->all_rowsets()) {
+ RETURN_NOT_OK(rowset->CountLiveRows(&tmp));
+ ret += tmp;
+ }
+ *count = ret;
+ return Status::OK();
+}
+
size_t Tablet::MemRowSetSize() const {
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index c4a5691..8235daf 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -347,6 +347,9 @@ class Tablet {
// memrowset in the current implementation.
Status CountRows(uint64_t *count) const;
+ // Count the number of live rows in this tablet.
+ Status CountLiveRows(int64_t* count) const;
+
// Verbosely dump this entire tablet to the logs. This is only
// really useful when debugging unit tests failures where the tablet
// has a very small number of rows.
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 7c536f7..ec0d628 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -90,6 +90,7 @@ Status TabletMetadata::CreateNew(FsManager* fs_manager,
const Partition& partition,
const TabletDataState& initial_tablet_data_state,
boost::optional<OpId> tombstone_last_logged_opid,
+ bool supports_live_row_count,
scoped_refptr<TabletMetadata>* metadata) {
// Verify that no existing tablet exists with the same ID.
@@ -110,7 +111,8 @@ Status TabletMetadata::CreateNew(FsManager* fs_manager,
partition_schema,
partition,
initial_tablet_data_state,
- std::move(tombstone_last_logged_opid)));
+ std::move(tombstone_last_logged_opid),
+ supports_live_row_count));
RETURN_NOT_OK(ret->Flush());
dir_group_cleanup.cancel();
@@ -149,7 +151,9 @@ Status TabletMetadata::LoadOrCreate(FsManager* fs_manager,
if (s.IsNotFound()) {
return CreateNew(fs_manager, tablet_id, table_name, table_id, schema,
partition_schema, partition, initial_tablet_data_state,
- std::move(tombstone_last_logged_opid), metadata);
+ std::move(tombstone_last_logged_opid),
+ /*supports_live_row_count=*/ true,
+ metadata);
}
return s;
}
@@ -266,7 +270,8 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id,
const Schema& schema, PartitionSchema partition_schema,
Partition partition,
const TabletDataState& tablet_data_state,
- boost::optional<OpId> tombstone_last_logged_opid)
+ boost::optional<OpId> tombstone_last_logged_opid,
+ bool supports_live_row_count)
: state_(kNotWrittenYet),
tablet_id_(std::move(tablet_id)),
table_id_(std::move(table_id)),
@@ -283,7 +288,8 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id,
num_flush_pins_(0),
needs_flush_(false),
flush_count_for_tests_(0),
- pre_flush_callback_(Bind(DoNothingStatusClosure)) {
+ pre_flush_callback_(Bind(DoNothingStatusClosure)),
+ supports_live_row_count_(supports_live_row_count) {
CHECK(schema_->has_column_ids());
CHECK_GT(schema_->num_key_columns(), 0);
}
@@ -302,7 +308,8 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id)
num_flush_pins_(0),
needs_flush_(false),
flush_count_for_tests_(0),
- pre_flush_callback_(Bind(DoNothingStatusClosure)) {}
+ pre_flush_callback_(Bind(DoNothingStatusClosure)),
+ supports_live_row_count_(false) {}
Status TabletMetadata::LoadFromDisk() {
TRACE_EVENT1("tablet", "TabletMetadata::LoadFromDisk",
@@ -386,6 +393,9 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock)
tablet_data_state_ = superblock.tablet_data_state();
+ // This field should be parsed before parsing RowSetDataPB.
+ supports_live_row_count_ = superblock.supports_live_row_count();
+
rowsets_.clear();
for (const RowSetDataPB& rowset_pb : superblock.rowsets()) {
unique_ptr<RowSetMetadata> rowset_meta;
@@ -677,6 +687,8 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
pb.mutable_data_dir_group()->Swap(&group_pb);
}
+ pb.set_supports_live_row_count(supports_live_row_count_);
+
super_block->Swap(&pb);
return Status::OK();
}
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index adf0d4c..1462950 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -81,6 +81,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
const Partition& partition,
const TabletDataState& initial_tablet_data_state,
boost::optional<consensus::OpId> tombstone_last_logged_opid,
+ bool supports_live_row_count,
scoped_refptr<TabletMetadata>* metadata);
// Load existing metadata from disk.
@@ -263,6 +264,15 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
return flush_count_for_tests_;
}
+ // For testing only.
+ void set_supports_live_row_count_for_tests(bool supports_live_row_count) {
+ supports_live_row_count_ = supports_live_row_count;
+ }
+
+ bool supports_live_row_count() const {
+ return supports_live_row_count_;
+ }
+
private:
friend class RefCountedThreadSafe<TabletMetadata>;
friend class MetadataTest;
@@ -279,7 +289,8 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
const Schema& schema, PartitionSchema partition_schema,
Partition partition,
const TabletDataState& tablet_data_state,
- boost::optional<consensus::OpId> tombstone_last_logged_opid);
+ boost::optional<consensus::OpId> tombstone_last_logged_opid,
+ bool supports_live_row_count);
// Constructor for loading an existing tablet.
TabletMetadata(FsManager* fs_manager, std::string tablet_id);
@@ -387,6 +398,9 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
// call to Flush() or LoadFromDisk().
std::atomic<int64_t> on_disk_size_;
+ // The tablet supports live row counting if true.
+ bool supports_live_row_count_;
+
DISALLOW_COPY_AND_ASSIGN(TabletMetadata);
};
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 471ef56..65829e9 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -1677,6 +1677,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpDataDirs) {
kSchemaWithIds, partition.first, partition.second,
tablet::TABLET_DATA_READY,
/*tombstone_last_logged_opid=*/ boost::none,
+ /*supports_live_row_count=*/ true,
&meta));
string stdout;
NO_FATALS(RunActionStdoutString(Substitute("local_replica dump data_dirs $0 "
@@ -1711,6 +1712,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpMeta) {
kSchemaWithIds, partition.first, partition.second,
tablet::TABLET_DATA_READY,
/*tombstone_last_logged_opid=*/ boost::none,
+ /*supports_live_row_count=*/ true,
&meta);
string stdout;
NO_FATALS(RunActionStdoutString(Substitute("local_replica dump meta $0 "
diff --git a/src/kudu/tserver/tablet_copy-test-base.h b/src/kudu/tserver/tablet_copy-test-base.h
index b4db99c..7319a6c 100644
--- a/src/kudu/tserver/tablet_copy-test-base.h
+++ b/src/kudu/tserver/tablet_copy-test-base.h
@@ -110,7 +110,7 @@ class TabletCopyTest : public TabletServerTestBase {
// Generate the test data for the tablet and do the flushing we assume will be
// done in the unit tests for tablet copy.
- void GenerateTestData() {
+ virtual void GenerateTestData() {
const int kIncr = 50;
LOG_TIMING(INFO, "Loading test data") {
for (int row_id = 0; row_id < kNumLogRolls * kIncr; row_id += kIncr) {
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
index 8c880ce..5dd9f4e 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#include "kudu/tserver/tablet_copy-test-base.h"
+#include "kudu/tserver/tablet_copy_client.h"
#include <cstdint>
#include <limits>
@@ -53,8 +53,8 @@
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/tablet_copy-test-base.h"
#include "kudu/tserver/tablet_copy.pb.h"
-#include "kudu/tserver/tablet_copy_client.h"
#include "kudu/util/crc.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
@@ -63,9 +63,11 @@
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/path_util.h"
+#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
using std::shared_ptr;
using std::string;
@@ -143,6 +145,14 @@ class TabletCopyClientTest : public TabletCopyTest {
protected:
Status CompareFileContents(const string& path1, const string& path2);
+ // Injection of 'supports_live_row_count' modifiers through polymorphic characteristic.
+ virtual void GenerateTestData() {
+ Random rand(SeedRandom());
+ NO_FATALS(tablet_replica_->tablet_metadata()->
+ set_supports_live_row_count_for_tests(rand.Next() % 2));
+ NO_FATALS(TabletCopyTest::GenerateTestData());
+ }
+
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
unique_ptr<FsManager> fs_manager_;
@@ -401,6 +411,12 @@ TEST_F(TabletCopyClientTest, TestFailedDiskStopsClient) {
ASSERT_TRUE(s.IsIOError());
}
+TEST_F(TabletCopyClientTest, TestSupportsLiveRowCount) {
+ ASSERT_OK(StartCopy());
+ ASSERT_EQ(meta_->supports_live_row_count(),
+ tablet_replica_->tablet_metadata()->supports_live_row_count());
+}
+
enum DownloadBlocks {
kDownloadBlocks, // Fetch blocks from remote.
kNoDownloadBlocks, // Do not fetch blocks from remote.
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index 6c3669c..f4fbe48 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -360,6 +360,7 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr,
partition,
superblock_->tablet_data_state(),
superblock_->tombstone_last_logged_opid(),
+ remote_superblock_->supports_live_row_count(),
&meta_));
TRACE("Wrote new tablet metadata");
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 9e83b50..9531f09 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -442,6 +442,7 @@ Status TSTabletManager::CreateNewTablet(const string& table_id,
partition,
TABLET_DATA_READY,
boost::none,
+ /*supports_live_row_count=*/ true,
&meta),
"Couldn't create tablet metadata");