You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/12/15 16:11:13 UTC
[kudu] branch master updated: [tablet] clean up on Tablet::FlushBiggestDMS()
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 21f249b86 [tablet] clean up on Tablet::FlushBiggestDMS()
21f249b86 is described below
commit 21f249b867e1f6bb3a9276c6b55d811a7068d19e
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Tue Dec 13 12:22:21 2022 -0800
[tablet] clean up on Tablet::FlushBiggestDMS()
I was looking into the code in tablet.{cc,h} related to various
maintenance operations and found there was some room for improvement.
So, this patch updates the code with:
* renaming Tablet::FlushBiggestDMS --> Tablet::FlushBiggestDMSForTests
since that method is called only by tests
* wrapping FlushBiggestDMSForTests into ASSERT_OK in tests
* wrapping maintenance operations into std::unique_ptr
in {Tablet,TabletReplica}::maintenance_ops_ containers instead of
storing raw pointers
* using std::make_shared() instead of std::shared_ptr(new ...)
* using std::move() with std::shared_ptr where appropriate
* fixing unsorted style-related nits
Change-Id: I1760671f32694801e018ac7f5854e3fb95a23876
Reviewed-on: http://gerrit.cloudera.org:8080/19355
Tested-by: Kudu Jenkins
Reviewed-by: Yifan Zhang <ch...@163.com>
---
src/kudu/integration-tests/alter_table-test.cc | 16 ++--
src/kudu/integration-tests/fuzz-itest.cc | 3 +-
.../integration-tests/tablet_history_gc-itest.cc | 2 +-
src/kudu/tablet/major_delta_compaction-test.cc | 20 ++---
src/kudu/tablet/mt-tablet-test.cc | 5 +-
src/kudu/tablet/tablet.cc | 89 +++++++++++-----------
src/kudu/tablet/tablet.h | 8 +-
src/kudu/tablet/tablet_history_gc-test.cc | 7 +-
src/kudu/tablet/tablet_replica-test.cc | 10 +--
src/kudu/tablet/tablet_replica.cc | 35 +++++----
src/kudu/tablet/tablet_replica.h | 9 ++-
src/kudu/tserver/tablet_server-test.cc | 6 +-
12 files changed, 104 insertions(+), 106 deletions(-)
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 9d2e93fd0..ded8bff3b 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -370,12 +370,12 @@ class AlterTableTest : public KuduTest {
vector<unique_ptr<KuduPartialRow>> split_rows,
vector<pair<unique_ptr<KuduPartialRow>, unique_ptr<KuduPartialRow>>> bounds);
- void CheckMaintenancePriority(int32_t expect_priority) {
- for (auto op : tablet_replica_->maintenance_ops_) {
- ASSERT_EQ(op->priority(), expect_priority);
+ void CheckMaintenancePriority(int32_t expected_priority) {
+ for (auto& op : tablet_replica_->maintenance_ops_) {
+ ASSERT_EQ(expected_priority, op->priority());
}
- for (auto op : tablet_replica_->tablet()->maintenance_ops_) {
- ASSERT_EQ(op->priority(), expect_priority);
+ for (auto& op : tablet_replica_->tablet()->maintenance_ops_) {
+ ASSERT_EQ(expected_priority, op->priority());
}
}
@@ -1035,7 +1035,7 @@ TEST_F(AlterTableTest, TestMajorCompactDeltasAfterUpdatingRemovedColumn) {
UpdateRow(0, { {"c1", 54321} });
// Make sure the delta is in a delta-file.
- ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMSForTests());
// Drop c1.
LOG(INFO) << "Dropping c1";
@@ -1085,7 +1085,7 @@ TEST_F(AlterTableTest, TestMajorCompactDeltasIntoMissingBaseData) {
UpdateRow(0, { {"c2", 54321} });
// Make sure the delta is in a delta-file.
- ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMSForTests());
NO_FATALS(ScanToStrings(&rows));
ASSERT_EQ(2, rows.size());
@@ -1135,7 +1135,7 @@ TEST_F(AlterTableTest, TestMajorCompactDeltasAfterAddUpdateRemoveColumn) {
UpdateRow(0, { {"c2", 54321} });
// Make sure the delta is in a delta-file.
- ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMSForTests());
NO_FATALS(ScanToStrings(&rows));
ASSERT_EQ(1, rows.size());
diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index df521b523..4e60d6d85 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -45,7 +45,6 @@
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
-#include "kudu/clock/clock.h"
#include "kudu/clock/logical_clock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
@@ -1441,7 +1440,7 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
ASSERT_OK(tablet()->Flush());
break;
case TEST_FLUSH_DELTAS:
- ASSERT_OK(tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
break;
case TEST_MAJOR_COMPACT_DELTAS:
ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc
index d1de72ff1..620216f66 100644
--- a/src/kudu/integration-tests/tablet_history_gc-itest.cc
+++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc
@@ -895,7 +895,7 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) {
ASSERT_OK(tablet->Flush());
} else {
VLOG(1) << "Flushing biggest DMS";
- ASSERT_OK(tablet->FlushBiggestDMS());
+ ASSERT_OK(tablet->FlushBiggestDMSForTests());
}
break;
}
diff --git a/src/kudu/tablet/major_delta_compaction-test.cc b/src/kudu/tablet/major_delta_compaction-test.cc
index 48857bf7b..3223e42ad 100644
--- a/src/kudu/tablet/major_delta_compaction-test.cc
+++ b/src/kudu/tablet/major_delta_compaction-test.cc
@@ -20,6 +20,7 @@
#include <memory>
#include <ostream>
#include <string>
+#include <type_traits>
#include <vector>
#include <gflags/gflags_declare.h>
@@ -38,7 +39,6 @@
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet-test-util.h"
-#include "kudu/tablet/tablet.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -190,7 +190,7 @@ TEST_F(TestMajorDeltaCompaction, TestKudu2656) {
shared_ptr<RowSet> rs = all_rowsets.front();
// Create some on-disk deltas.
NO_FATALS(UpdateRows(kNumRows, /*even=*/false));
- ASSERT_OK(tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
// Major compact some columns.
vector<ColumnId> col_ids = { schema_.column_id(1),
@@ -233,12 +233,12 @@ TEST_F(TestMajorDeltaCompaction, TestCompact) {
NO_FATALS(VerifyData());
// Flush the deltas, make sure data stays the same.
- ASSERT_OK(tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
NO_FATALS(VerifyData());
// Update the odd rows and flush deltas
NO_FATALS(UpdateRows(kNumRows, true));
- ASSERT_OK(tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
NO_FATALS(VerifyData());
// Major compact some columns.
@@ -272,7 +272,7 @@ TEST_F(TestMajorDeltaCompaction, TestUndos) {
// Flush the DMS, make sure we still see the old data.
NO_FATALS(UpdateRows(kNumRows, false));
- ASSERT_OK(tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
NO_FATALS(VerifyDataWithMvccAndExpectedState(snap, old_state));
// Major compact, check we still have the old data.
@@ -287,7 +287,7 @@ TEST_F(TestMajorDeltaCompaction, TestUndos) {
for (int j = 0; j < 3; j++) {
NO_FATALS(UpdateRows(kNumRows, false));
}
- ASSERT_OK(tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
}
// To complicate things further, only major compact two columns, then verify we can read the old
@@ -310,14 +310,14 @@ TEST_F(TestMajorDeltaCompaction, TestCarryDeletesOver) {
shared_ptr<RowSet> rs = all_rowsets.front();
NO_FATALS(UpdateRows(kNumRows, false));
- ASSERT_OK(tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
MvccSnapshot updates_snap(*tablet()->mvcc_manager());
vector<ExpectedRow> old_state(expected_state_.size());
std::copy(expected_state_.begin(), expected_state_.end(), old_state.begin());
NO_FATALS(DeleteRows(kNumRows));
- ASSERT_OK(tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
vector<ColumnId> col_ids_to_compact = { schema_.column_id(4) };
ASSERT_OK(tablet()->DoMajorDeltaCompaction(col_ids_to_compact, rs));
@@ -355,7 +355,7 @@ TEST_F(TestMajorDeltaCompaction, TestReinserts) {
// the DMS with the deletes so that we can major compact them.
NO_FATALS(DeleteRows(kNumRows)); // Delete 2nd batch.
NO_FATALS(WriteTestTablet(kNumRows)); // 3rd batch.
- ASSERT_OK(tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
// At this point, here's the layout (the 1st batch was discarded during the first flush):
// MRS: 3rd batch of inserts.
@@ -391,7 +391,7 @@ TEST_F(TestMajorDeltaCompaction, TestJustDeletes) {
NO_FATALS(WriteTestTablet(kNumRows));
ASSERT_OK(tablet()->Flush());
NO_FATALS(DeleteRows(kNumRows));
- ASSERT_OK(tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
shared_ptr<RowSet> rs;
ASSERT_EQ(0,
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index 05f29b3e0..1e20dad9a 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -22,6 +22,7 @@
#include <ostream>
#include <string>
#include <thread>
+#include <type_traits>
#include <vector>
#include <gflags/gflags.h>
@@ -306,7 +307,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
}
if (tablet()->DeltaMemStoresSize() > FLAGS_tablet_test_flush_threshold_mb * 1024 * 1024) {
- CHECK_OK(tablet()->FlushBiggestDMS());
+ CHECK_OK(tablet()->FlushBiggestDMSForTests());
}
// Wait, unless the inserters are all done.
@@ -317,7 +318,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
void FlushDeltasThread(int /*tid*/) {
while (running_insert_count_.count() > 0) {
- CHECK_OK(tablet()->FlushBiggestDMS());
+ CHECK_OK(tablet()->FlushBiggestDMSForTests());
// Wait, unless the inserters are all done.
running_insert_count_.WaitFor(kBackgroundOpInterval);
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 364eaa4ac..c5bc838a5 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -58,7 +58,6 @@
#include "kudu/fs/io_context.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/threading/thread_collision_warner.h"
@@ -230,6 +229,7 @@ using kudu::log::LogAnchorRegistry;
using kudu::log::MinLogIndexAnchorer;
using std::endl;
using std::make_shared;
+using std::make_unique;
using std::nullopt;
using std::optional;
using std::ostream;
@@ -347,8 +347,6 @@ Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids,
next_mrs_id_ = metadata_->last_durable_mrs_id() + 1;
- RowSetVector rowsets_opened;
-
// If we persisted the state of any transaction IDs before shutting down,
// initialize those that were in-flight here as kOpen. If there were any ops
// applied that didn't get persisted to the tablet metadata, the bootstrap
@@ -359,6 +357,8 @@ Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids,
fs::IOContext io_context({ tablet_id() });
// open the tablet row-sets
+ RowSetVector rowsets_opened;
+ rowsets_opened.reserve(metadata_->rowsets().size());
for (const shared_ptr<RowSetMetadata>& rowset_meta : metadata_->rowsets()) {
shared_ptr<DiskRowSet> rowset;
Status s = DiskRowSet::Open(rowset_meta,
@@ -372,12 +372,12 @@ Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids,
return s;
}
- rowsets_opened.push_back(rowset);
+ rowsets_opened.emplace_back(std::move(rowset));
}
{
auto new_rowset_tree(make_shared<RowSetTree>());
- CHECK_OK(new_rowset_tree->Reset(rowsets_opened));
+ RETURN_NOT_OK(new_rowset_tree->Reset(rowsets_opened));
// Now that the current state is loaded, create the new MemRowSet with the next id.
shared_ptr<MemRowSet> new_mrs;
@@ -644,17 +644,14 @@ void Tablet::AssignTimestampAndStartOpForTests(WriteOpState* op_state) {
}
void Tablet::StartOp(WriteOpState* op_state) {
- unique_ptr<ScopedOp> mvcc_op;
DCHECK(op_state->has_timestamp());
- mvcc_op.reset(new ScopedOp(&mvcc_, op_state->timestamp()));
- op_state->SetMvccOp(std::move(mvcc_op));
+ op_state->SetMvccOp(make_unique<ScopedOp>(&mvcc_, op_state->timestamp()));
}
void Tablet::StartOp(ParticipantOpState* op_state) {
if (op_state->request()->op().type() == tserver::ParticipantOpPB::BEGIN_COMMIT) {
DCHECK(op_state->has_timestamp());
- unique_ptr<ScopedOp> mvcc_op(new ScopedOp(&mvcc_, op_state->timestamp()));
- op_state->SetMvccOp(std::move(mvcc_op));
+ op_state->SetMvccOp(make_unique<ScopedOp>(&mvcc_, op_state->timestamp()));
}
}
@@ -1809,33 +1806,28 @@ void Tablet::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
DCHECK(maintenance_ops_.empty());
}
- vector<MaintenanceOp*> maintenance_ops;
- unique_ptr<MaintenanceOp> rs_compact_op(new CompactRowSetsOp(this));
- maint_mgr->RegisterOp(rs_compact_op.get());
- maintenance_ops.push_back(rs_compact_op.release());
+ vector<unique_ptr<MaintenanceOp>> maintenance_ops;
+ maintenance_ops.emplace_back(new CompactRowSetsOp(this));
+ maint_mgr->RegisterOp(maintenance_ops.back().get());
- unique_ptr<MaintenanceOp> minor_delta_compact_op(new MinorDeltaCompactionOp(this));
- maint_mgr->RegisterOp(minor_delta_compact_op.get());
- maintenance_ops.push_back(minor_delta_compact_op.release());
+ maintenance_ops.emplace_back(new MinorDeltaCompactionOp(this));
+ maint_mgr->RegisterOp(maintenance_ops.back().get());
- unique_ptr<MaintenanceOp> major_delta_compact_op(new MajorDeltaCompactionOp(this));
- maint_mgr->RegisterOp(major_delta_compact_op.get());
- maintenance_ops.push_back(major_delta_compact_op.release());
+ maintenance_ops.emplace_back(new MajorDeltaCompactionOp(this));
+ maint_mgr->RegisterOp(maintenance_ops.back().get());
- unique_ptr<MaintenanceOp> undo_delta_block_gc_op(new UndoDeltaBlockGCOp(this));
- maint_mgr->RegisterOp(undo_delta_block_gc_op.get());
- maintenance_ops.push_back(undo_delta_block_gc_op.release());
+ maintenance_ops.emplace_back(new UndoDeltaBlockGCOp(this));
+ maint_mgr->RegisterOp(maintenance_ops.back().get());
// The deleted rowset GC operation relies on live rowset counting. If this
// tablet doesn't support such counting, do not register the op.
if (metadata_->supports_live_row_count()) {
- unique_ptr<MaintenanceOp> deleted_rowset_gc_op(new DeletedRowsetGCOp(this));
- maint_mgr->RegisterOp(deleted_rowset_gc_op.get());
- maintenance_ops.push_back(deleted_rowset_gc_op.release());
+ maintenance_ops.emplace_back(new DeletedRowsetGCOp(this));
+ maint_mgr->RegisterOp(maintenance_ops.back().get());
}
std::lock_guard<simple_spinlock> l(state_lock_);
- maintenance_ops_.swap(maintenance_ops);
+ maintenance_ops_ = std::move(maintenance_ops);
}
void Tablet::UnregisterMaintenanceOps() {
@@ -1847,21 +1839,24 @@ void Tablet::UnregisterMaintenanceOps() {
// operation to finish in Unregister(), a different one can't get re-scheduled.
CancelMaintenanceOps();
+ decltype(maintenance_ops_) ops;
+ {
+ std::lock_guard<simple_spinlock> l(state_lock_);
+ ops = std::move(maintenance_ops_);
+ maintenance_ops_.clear();
+ }
+
// We don't lock here because unregistering ops may take a long time.
// 'maintenance_registration_fake_lock_' is sufficient to ensure nothing else
// is updating 'maintenance_ops_'.
- for (MaintenanceOp* op : maintenance_ops_) {
+ for (auto& op : ops) {
op->Unregister();
}
-
- // Finally, delete the ops under lock.
- std::lock_guard<simple_spinlock> l(state_lock_);
- STLDeleteElements(&maintenance_ops_);
}
void Tablet::CancelMaintenanceOps() {
std::lock_guard<simple_spinlock> l(state_lock_);
- for (MaintenanceOp* op : maintenance_ops_) {
+ for (auto& op : maintenance_ops_) {
op->CancelAndDisable();
}
}
@@ -1938,12 +1933,16 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
// The RollingDiskRowSet writer wrote out one or more RowSets as the
// output. Open these into 'new_rowsets'.
- vector<shared_ptr<RowSet> > new_disk_rowsets;
RowSetMetadataVector new_drs_metas;
drsw.GetWrittenRowSetMetadata(&new_drs_metas);
-
- if (metrics_.get()) metrics_->bytes_flushed->IncrementBy(drsw.written_size());
CHECK(!new_drs_metas.empty());
+
+ if (metrics_) {
+ metrics_->bytes_flushed->IncrementBy(drsw.written_size());
+ }
+
+ vector<shared_ptr<RowSet>> new_disk_rowsets;
+ new_disk_rowsets.reserve(new_drs_metas.size());
{
TRACE_EVENT0("tablet", "Opening compaction results");
for (const shared_ptr<RowSetMetadata>& meta : new_drs_metas) {
@@ -1960,7 +1959,7 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
<< meta->ToString() << ": " << s.ToString();
return s;
}
- new_disk_rowsets.push_back(new_rowset);
+ new_disk_rowsets.emplace_back(std::move(new_rowset));
}
}
@@ -1994,7 +1993,7 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
"duplicate updates in new rowsets)",
op_name);
shared_ptr<DuplicatingRowSet> inprogress_rowset(
- new DuplicatingRowSet(input.rowsets(), new_disk_rowsets));
+ make_shared<DuplicatingRowSet>(input.rowsets(), new_disk_rowsets));
// The next step is to swap in the DuplicatingRowSet, and at the same time,
// determine an MVCC snapshot which includes all of the ops that saw a
@@ -2564,21 +2563,21 @@ int64_t Tablet::GetReplaySizeForIndex(int64_t min_log_index,
return it->second;
}
-Status Tablet::FlushBiggestDMS() {
+Status Tablet::FlushBiggestDMSForTests() {
RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
- int64_t max_size = -1;
+ size_t max_size = 0;
shared_ptr<RowSet> biggest_drs;
- for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
- int64_t current = rowset->DeltaMemStoreSize();
+ for (const auto& rowset : comps->rowsets->all_rowsets()) {
+ size_t current = rowset->DeltaMemStoreSize();
if (current > max_size) {
max_size = current;
biggest_drs = rowset;
}
}
- return max_size > 0 ? biggest_drs->FlushDeltas(nullptr) : Status::OK();
+ return biggest_drs ? biggest_drs->FlushDeltas(nullptr) : Status::OK();
}
Status Tablet::FlushAllDMSForTests() {
@@ -2657,7 +2656,7 @@ double Tablet::GetPerfImprovementForBestDeltaCompactUnlocked(RowSet::DeltaCompac
GetComponents(&comps);
double worst_delta_perf = 0;
shared_ptr<RowSet> worst_rs;
- for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
+ for (const auto& rowset : comps->rowsets->all_rowsets()) {
if (!rowset->IsAvailableForCompaction()) {
continue;
}
@@ -2668,7 +2667,7 @@ double Tablet::GetPerfImprovementForBestDeltaCompactUnlocked(RowSet::DeltaCompac
}
}
if (rs && worst_delta_perf > 0) {
- *rs = worst_rs;
+ *rs = std::move(worst_rs);
}
return worst_delta_perf;
}
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 47ca3b5bc..7a56e0ce3 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -254,7 +254,6 @@ class Tablet {
// in a new (initially empty) MemRowSet in its place.
//
// This doesn't flush any DeltaMemStores for any existing RowSets.
- // To do that, call FlushBiggestDMS() for example.
Status Flush();
// Prepares the op context for the alter schema operation.
@@ -328,8 +327,8 @@ class Tablet {
// Flushes the DMS with the highest retention.
Status FlushBestDMS(const ReplaySizeMap &replay_size_map) const;
- // Flush only the biggest DMS
- Status FlushBiggestDMS();
+ // Flush only the biggest DMS. Only used for tests.
+ Status FlushBiggestDMSForTests();
// Flush all delta memstores. Only used for tests.
Status FlushAllDMSForTests();
@@ -841,7 +840,8 @@ class Tablet {
std::shared_ptr<FlushFaultHooks> flush_hooks_;
std::shared_ptr<FlushCompactCommonHooks> common_hooks_;
- std::vector<MaintenanceOp*> maintenance_ops_;
+ // Tablet owns MaintenanceOp objects allocated on the heap.
+ std::vector<std::unique_ptr<MaintenanceOp>> maintenance_ops_;
// Lock protecting access to 'last_write_time_' and 'last_read_time_'.
mutable rw_spinlock last_rw_time_lock_;
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc
index d93de757a..fe3ff54d0 100644
--- a/src/kudu/tablet/tablet_history_gc-test.cc
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -23,6 +23,7 @@
#include <optional>
#include <ostream>
#include <string>
+#include <type_traits>
#include <vector>
#include <gflags/gflags_declare.h>
@@ -32,7 +33,6 @@
#include "kudu/clock/clock.h"
#include "kudu/clock/hybrid_clock.h"
#include "kudu/clock/mock_ntp.h"
-#include "kudu/clock/time_service.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/schema.h"
@@ -47,7 +47,6 @@
#include "kudu/tablet/diskrowset.h"
#include "kudu/tablet/local_tablet_writer.h"
#include "kudu/tablet/rowset.h"
-#include "kudu/tablet/rowset_metadata.h"
#include "kudu/tablet/tablet-harness.h"
#include "kudu/tablet/tablet-test-base.h"
#include "kudu/tablet/tablet-test-util.h"
@@ -207,7 +206,7 @@ TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) {
}
// We must flush the DMS before major compaction can operate on these REDOs.
for (int i = 0; i < kNumRowsets; i++) {
- tablet()->FlushBiggestDMS();
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
}
post_update_ts[val - 1] = clock()->Now();
}
@@ -260,7 +259,7 @@ TEST_F(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns) {
ASSERT_OK_FAST(writer.Update(row));
}
for (int i = 0; i < kNumRowsets; i++) {
- tablet()->FlushBiggestDMS();
+ ASSERT_OK(tablet()->FlushBiggestDMSForTests());
}
NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200)));
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index 4a5976ab5..bd6670ce9 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -40,7 +40,7 @@
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
-#include "kudu/consensus/log_reader.h"
+#include "kudu/consensus/log_reader.h" // IWYU pragma: keep
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
@@ -363,7 +363,7 @@ TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
ASSERT_EQ(5, segments.size());
// Flush DMS to release the anchor.
- tablet_replica_->tablet()->FlushBiggestDMS();
+ ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMSForTests());
// Verify no anchors after Flush().
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
@@ -458,7 +458,7 @@ TEST_F(TabletReplicaTest, TestActiveOpPreventsLogGC) {
log->reader()->GetSegmentsSnapshot(&segments);
ASSERT_EQ(5, segments.size());
ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
- tablet_replica_->tablet()->FlushBiggestDMS();
+ ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMSForTests());
ASSERT_EVENTUALLY([&]{
AssertNoLogAnchors();
@@ -481,7 +481,7 @@ TEST_F(TabletReplicaTest, TestActiveOpPreventsLogGC) {
rpc_latch.Wait();
tablet_replica_->op_tracker_.WaitForAllToFinish();
ASSERT_EQ(0, tablet_replica_->op_tracker_.GetNumPendingForTests());
- tablet_replica_->tablet()->FlushBiggestDMS();
+ ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMSForTests());
ASSERT_EVENTUALLY([&]{ AssertNoLogAnchors(); });
// All should be deleted except the two last segments.
@@ -682,7 +682,7 @@ TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) {
ASSERT_EQ(0, live_row_count->value());
// Now do that again but with deltafiles.
- ASSERT_OK(tablet->FlushBiggestDMS());
+ ASSERT_OK(tablet->FlushBiggestDMSForTests());
ASSERT_OK(RestartReplica());
tablet = tablet_replica_->tablet();
ASSERT_EQ(1, tablet->num_rowsets());
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 5c6759c7b..a7119c7ac 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -49,7 +49,6 @@
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
-#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/result_tracker.h"
#include "kudu/rpc/rpc_header.pb.h"
@@ -58,6 +57,7 @@
#include "kudu/tablet/ops/op_driver.h"
#include "kudu/tablet/ops/participant_op.h"
#include "kudu/tablet/ops/write_op.h"
+#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/tablet_replica_mm_ops.h"
#include "kudu/tablet/txn_coordinator.h"
@@ -937,19 +937,16 @@ void TabletReplica::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
return;
}
- vector<MaintenanceOp*> maintenance_ops;
+ vector<unique_ptr<MaintenanceOp>> maintenance_ops;
- unique_ptr<MaintenanceOp> mrs_flush_op(new FlushMRSOp(this));
- maint_mgr->RegisterOp(mrs_flush_op.get());
- maintenance_ops.push_back(mrs_flush_op.release());
+ maintenance_ops.emplace_back(new FlushMRSOp(this));
+ maint_mgr->RegisterOp(maintenance_ops.back().get());
- unique_ptr<MaintenanceOp> dms_flush_op(new FlushDeltaMemStoresOp(this));
- maint_mgr->RegisterOp(dms_flush_op.get());
- maintenance_ops.push_back(dms_flush_op.release());
+ maintenance_ops.emplace_back(new FlushDeltaMemStoresOp(this));
+ maint_mgr->RegisterOp(maintenance_ops.back().get());
- unique_ptr<MaintenanceOp> log_gc(new LogGCOp(this));
- maint_mgr->RegisterOp(log_gc.get());
- maintenance_ops.push_back(log_gc.release());
+ maintenance_ops.emplace_back(new LogGCOp(this));
+ maint_mgr->RegisterOp(maintenance_ops.back().get());
std::shared_ptr<Tablet> tablet;
{
@@ -963,22 +960,22 @@ void TabletReplica::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
void TabletReplica::CancelMaintenanceOpsForTests() {
std::lock_guard<simple_spinlock> l(lock_);
- for (MaintenanceOp* op : maintenance_ops_) {
+ for (auto& op : maintenance_ops_) {
op->CancelAndDisable();
}
}
void TabletReplica::UnregisterMaintenanceOps() {
DCHECK(state_change_lock_.is_locked());
- vector<MaintenanceOp*> maintenance_ops;
+ decltype(maintenance_ops_) ops;
{
std::lock_guard<simple_spinlock> l(lock_);
- maintenance_ops = std::move(maintenance_ops_);
+ ops = std::move(maintenance_ops_);
+ maintenance_ops_.clear();
}
- for (MaintenanceOp* op : maintenance_ops) {
+ for (auto& op : ops) {
op->Unregister();
}
- STLDeleteElements(&maintenance_ops);
}
size_t TabletReplica::OnDiskSize() const {
@@ -1164,12 +1161,14 @@ void TabletReplica::MakeUnavailable(const Status& error) {
{
std::lock_guard<simple_spinlock> lock(lock_);
tablet = tablet_;
- for (MaintenanceOp* op : maintenance_ops_) {
+ for (auto& op : maintenance_ops_) {
op->CancelAndDisable();
}
}
// Stop the Tablet from doing further I/O.
- if (tablet) tablet->Stop();
+ if (tablet) {
+ tablet->Stop();
+ }
// Set the error; when the replica is shut down, it will end up FAILED.
SetError(error);
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 099111766..09985a822 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -24,6 +24,7 @@
#include <memory>
#include <mutex>
#include <string>
+#include <type_traits>
#include <unordered_map>
#include <vector>
@@ -40,7 +41,7 @@
#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/ops/op_tracker.h"
#include "kudu/tablet/ops/write_op.h"
-#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet.h" // IWYU pragma: keep
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/locks.h"
@@ -602,9 +603,9 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
clock::Clock* clock_;
- // List of maintenance operations for the tablet that need information that only the peer
- // can provide.
- std::vector<MaintenanceOp*> maintenance_ops_;
+ // Maintenance operations for the tablet that need information that only
+ // the peer can provide.
+ std::vector<std::unique_ptr<MaintenanceOp>> maintenance_ops_;
// The result tracker for writes.
scoped_refptr<rpc::ResultTracker> result_tracker_;
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 0adf63cd7..608c7ecfa 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -33,6 +33,7 @@
#include <sstream>
#include <string>
#include <thread>
+#include <type_traits>
#include <unordered_set>
#include <utility>
#include <vector>
@@ -58,7 +59,6 @@
#include "kudu/consensus/log-test-base.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/metadata.pb.h"
-#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/data_dirs.h"
@@ -1967,7 +1967,7 @@ TEST_F(TabletServerTest, TestKUDU_176_RecoveryAfterMajorDeltaCompaction) {
// Update it, flush deltas.
ANFF(UpdateTestRowRemote(1, 2));
- ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMSForTests());
ANFF(VerifyRows(schema_, { KeyValue(1, 2) }));
// Major compact deltas.
@@ -2090,7 +2090,7 @@ TEST_F(TabletServerTest, TestKUDU_177_RecoveryOfDMSEditsAfterMajorDeltaCompactio
// Update it, flush deltas.
ANFF(UpdateTestRowRemote(1, 2));
- ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMS());
+ ASSERT_OK(tablet_replica_->tablet()->FlushBiggestDMSForTests());
// Update it again, so this last update is in the DMS.
ANFF(UpdateTestRowRemote(1, 3));