You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/07/11 13:00:17 UTC
[kudu] 02/02: KUDU-2855 Lazy-create DeltaMemStore on first update
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 8973a285eaebee0736a6dac29de5eca39746eb0c
Author: helifu <hz...@corp.netease.com>
AuthorDate: Tue Jul 9 19:11:23 2019 +0800
KUDU-2855 Lazy-create DeltaMemStore on first update
This patch supports lazy-create DeltaMemStore on first update to
save memory and to fast-path out any DMS-related code. The created
DeltaMemStore will be destroyed on the following flush.
Change-Id: Ie0c565d86647d5144266b30aa6e8572d42db48c6
Reviewed-on: http://gerrit.cloudera.org:8080/13821
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
src/kudu/tablet/delta_tracker.cc | 84 +++++++++++++++++++++-------------
src/kudu/tablet/delta_tracker.h | 18 +++++---
src/kudu/tablet/diskrowset-test-base.h | 6 ++-
src/kudu/tablet/diskrowset-test.cc | 7 +--
4 files changed, 72 insertions(+), 43 deletions(-)
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 9400e32..4ea8684 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -95,7 +95,8 @@ DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata,
read_only_(false),
log_anchor_registry_(log_anchor_registry),
mem_trackers_(std::move(mem_trackers)),
- dms_empty_(true),
+ next_dms_id_(rowset_metadata_->last_durable_redo_dms_id() + 1),
+ dms_exists_(false),
deleted_row_count_(0) {}
Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
@@ -151,15 +152,22 @@ Status DeltaTracker::DoOpen(const IOContext* io_context) {
&undo_delta_stores_,
UNDO));
- // the id of the first DeltaMemStore is the max id of the current ones +1
- RETURN_NOT_OK(DeltaMemStore::Create(rowset_metadata_->last_durable_redo_dms_id() + 1,
+ open_ = true;
+ return Status::OK();
+}
+
+Status DeltaTracker::CreateAndInitDMSUnlocked(const fs::IOContext* io_context) {
+ DCHECK(component_lock_.is_write_locked());
+ shared_ptr<DeltaMemStore> dms;
+ RETURN_NOT_OK(DeltaMemStore::Create(next_dms_id_++,
rowset_metadata_->id(),
log_anchor_registry_,
mem_trackers_.dms_tracker,
- &dms_));
- RETURN_NOT_OK(dms_->Init(io_context));
+ &dms));
+ RETURN_NOT_OK(dms->Init(io_context));
- open_ = true;
+ dms_.swap(dms);
+ dms_exists_.Store(true);
return Status::OK();
}
@@ -581,7 +589,9 @@ void DeltaTracker::CollectStores(vector<shared_ptr<DeltaStore>>* deltas,
}
if (which != UNDOS_ONLY) {
deltas->insert(deltas->end(), redo_delta_stores_.begin(), redo_delta_stores_.end());
- deltas->push_back(dms_);
+ if (dms_exists_.Load() && !dms_->Empty()) {
+ deltas->push_back(dms_);
+ }
}
}
@@ -637,17 +647,32 @@ Status DeltaTracker::Update(Timestamp timestamp,
const RowChangeList &update,
const consensus::OpId& op_id,
OperationResultPB* result) {
- // TODO(todd): can probably lock this more fine-grained.
- shared_lock<rw_spinlock> lock(component_lock_);
+ Status s;
+ while (true) {
+ if (!dms_exists_.Load()) {
+ std::lock_guard<rw_spinlock> lock(component_lock_);
+ // Should check dms_exists_ here in case multiple threads are blocked.
+ if (!dms_exists_.Load()) {
+ RETURN_NOT_OK(CreateAndInitDMSUnlocked(nullptr));
+ }
+ }
- Status s = dms_->Update(timestamp, row_idx, update, op_id);
- if (s.ok()) {
- dms_empty_.Store(false);
+ // TODO(todd): can probably lock this more fine-grained.
+ shared_lock<rw_spinlock> lock(component_lock_);
- MemStoreTargetPB* target = result->add_mutated_stores();
- target->set_rs_id(rowset_metadata_->id());
- target->set_dms_id(dms_->id());
+ // Should check dms_exists_ here again since there is a gap
+ // between the two critical sections defined by component_lock_.
+ if (!dms_exists_.Load()) continue;
+
+ s = dms_->Update(timestamp, row_idx, update, op_id);
+ if (s.ok()) {
+ MemStoreTargetPB* target = result->add_mutated_stores();
+ target->set_rs_id(rowset_metadata_->id());
+ target->set_dms_id(dms_->id());
+ }
+ break;
}
+
return s;
}
@@ -655,12 +680,13 @@ Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, const IOContext* io_contex
bool *deleted, ProbeStats* stats) const {
shared_lock<rw_spinlock> lock(component_lock_);
-
*deleted = false;
// Check if the row has a deletion in DeltaMemStore.
- RETURN_NOT_OK(dms_->CheckRowDeleted(row_idx, io_context, deleted));
- if (*deleted) {
- return Status::OK();
+ if (dms_exists_.Load()) {
+ RETURN_NOT_OK(dms_->CheckRowDeleted(row_idx, io_context, deleted));
+ if (*deleted) {
+ return Status::OK();
+ }
}
// Then check backwards through the list of trackers.
@@ -744,17 +770,11 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_
// This shuts out any concurrent readers or writers.
std::lock_guard<rw_spinlock> lock(component_lock_);
- count = dms_->Count();
+ count = dms_exists_.Load() ? dms_->Count() : 0;
- // Swap the DeltaMemStore to use the new schema
- old_dms = dms_;
- RETURN_NOT_OK(DeltaMemStore::Create(old_dms->id() + 1,
- rowset_metadata_->id(),
- log_anchor_registry_,
- mem_trackers_.dms_tracker,
- &dms_));
- RETURN_NOT_OK(dms_->Init(nullptr));
- dms_empty_.Store(true);
+ // Swap the DeltaMemStore and dms_ is null now.
+ old_dms.swap(dms_);
+ dms_exists_.Store(false);
if (count == 0) {
// No need to flush if there are no deltas.
@@ -802,12 +822,12 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_
size_t DeltaTracker::DeltaMemStoreSize() const {
shared_lock<rw_spinlock> lock(component_lock_);
- return dms_->EstimateSize();
+ return dms_exists_.Load() ? dms_->EstimateSize() : 0;
}
int64_t DeltaTracker::MinUnflushedLogIndex() const {
shared_lock<rw_spinlock> lock(component_lock_);
- return dms_->MinLogIndex();
+ return dms_exists_.Load() ? dms_->MinLogIndex() : 0;
}
size_t DeltaTracker::CountUndoDeltaStores() const {
@@ -871,7 +891,7 @@ Status DeltaTracker::InitAllDeltaStoresForTests(WhichStores stores) {
int64_t DeltaTracker::CountDeletedRows() const {
shared_lock<rw_spinlock> lock(component_lock_);
DCHECK_GE(deleted_row_count_, 0);
- return deleted_row_count_ + dms_->deleted_row_count();
+ return deleted_row_count_ + (dms_exists_.Load() ? dms_->deleted_row_count() : 0);
}
string DeltaTracker::LogPrefix() const {
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index f1dabb5..072b6e6 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -230,9 +230,9 @@ class DeltaTracker {
// Get the delta MemStore's size in bytes, including pre-allocation.
size_t DeltaMemStoreSize() const;
- // Returns true if the DMS has no entries. This doesn't rely on the size.
+ // Returns true if the DMS doesn't exist. This doesn't rely on the size.
bool DeltaMemStoreEmpty() const {
- return dms_empty_.Load();
+ return !dms_exists_.Load();
}
// Get the minimum log index for this tracker's DMS, -1 if it wasn't set.
@@ -322,6 +322,8 @@ class DeltaTracker {
std::string LogPrefix() const;
+ Status CreateAndInitDMSUnlocked(const fs::IOContext* io_context);
+
std::shared_ptr<RowSetMetadata> rowset_metadata_;
bool open_;
@@ -337,6 +339,8 @@ class DeltaTracker {
TabletMemTrackers mem_trackers_;
+ int64_t next_dms_id_;
+
// The current DeltaMemStore into which updates should be written.
std::shared_ptr<DeltaMemStore> dms_;
// The set of tracked REDO delta stores, in increasing timestamp order.
@@ -345,12 +349,14 @@ class DeltaTracker {
SharedDeltaStoreVector undo_delta_stores_;
// The maintenance scheduler calls DeltaMemStoreEmpty() a lot.
- // We cache this here to avoid having to take component_lock_
- // in order to satisfy this call.
- AtomicBool dms_empty_;
+ // We use an atomic variable to indicate whether DMS exists or not and
+ // to avoid having to take component_lock_ in order to satisfy this call.
+ AtomicBool dms_exists_;
// read-write lock protecting dms_ and {redo,undo}_delta_stores_.
- // - Readers and mutators take this lock in shared mode.
+ // - Readers take this lock in shared mode.
+ // - Mutators take this lock in exclusive mode if they need to create
+ // a new DMS, and shared mode otherwise.
// - Flushers take this lock in exclusive mode before they modify the
// structure of the rowset.
//
diff --git a/src/kudu/tablet/diskrowset-test-base.h b/src/kudu/tablet/diskrowset-test-base.h
index 3c11c7c..787dcf3 100644
--- a/src/kudu/tablet/diskrowset-test-base.h
+++ b/src/kudu/tablet/diskrowset-test-base.h
@@ -59,7 +59,8 @@ class TestRowSet : public KuduRowSetTest {
: KuduRowSetTest(CreateTestSchema()),
n_rows_(FLAGS_roundtrip_num_rows),
op_id_(consensus::MaximumOpId()),
- clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+ clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
+ log_anchor_registry_(new log::LogAnchorRegistry()) {
CHECK_GT(n_rows_, 0);
}
@@ -327,7 +328,7 @@ class TestRowSet : public KuduRowSetTest {
Status OpenTestRowSet(std::shared_ptr<DiskRowSet> *rowset) {
return DiskRowSet::Open(rowset_meta_,
- new log::LogAnchorRegistry(),
+ log_anchor_registry_.get(),
TabletMemTrackers(),
nullptr,
rowset);
@@ -341,6 +342,7 @@ class TestRowSet : public KuduRowSetTest {
consensus::OpId op_id_; // Generally a "fake" OpId for these tests.
scoped_refptr<clock::Clock> clock_;
MvccManager mvcc_;
+ scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
};
} // namespace tablet
diff --git a/src/kudu/tablet/diskrowset-test.cc b/src/kudu/tablet/diskrowset-test.cc
index 9155e4b..ecb6016 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -354,8 +354,8 @@ TEST_F(TestRowSet, TestDMSFlush) {
ASSERT_OK(rs->FlushDeltas(nullptr));
- // Check that the DiskRowSet's DMS has now been emptied.
- ASSERT_EQ(0, rs->delta_tracker_->dms_->Count());
+ // Check that the DiskRowSet's DMS has not been initialized.
+ ASSERT_FALSE(rs->delta_tracker_->dms_);
// Now read back the value column, and verify that the updates
// are visible.
@@ -739,6 +739,7 @@ INSTANTIATE_TEST_CASE_P(RowIteratorOptionsPermutations, DiffScanRowSetTest,
// the test operates on a variety of different on-disk and in-memory layouts.
TEST_P(DiffScanRowSetTest, TestFuzz) {
fs::IOContext test_context;
+ scoped_refptr<log::LogAnchorRegistry> log_anchor_registry(new log::LogAnchorRegistry());
// Create and open a DRS with four rows.
shared_ptr<DiskRowSet> rs;
@@ -756,7 +757,7 @@ TEST_P(DiffScanRowSetTest, TestFuzz) {
ASSERT_OK(WriteRow(rb.data(), &drsw));
}
ASSERT_OK(drsw.Finish());
- ASSERT_OK(DiskRowSet::Open(rowset_meta_, new log::LogAnchorRegistry(),
+ ASSERT_OK(DiskRowSet::Open(rowset_meta_, log_anchor_registry.get(),
TabletMemTrackers(), &test_context, &rs));
}