You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/07/11 13:00:17 UTC

[kudu] 02/02: KUDU-2855 Lazy-create DeltaMemStore on first update

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 8973a285eaebee0736a6dac29de5eca39746eb0c
Author: helifu <hz...@corp.netease.com>
AuthorDate: Tue Jul 9 19:11:23 2019 +0800

    KUDU-2855 Lazy-create DeltaMemStore on first update
    
    This patch supports lazy-create DeltaMemStore on first update to
    save memory and to fast-path out any DMS-related code. The created
    DeltaMemStore will be destroyed on the following flush.
    
    Change-Id: Ie0c565d86647d5144266b30aa6e8572d42db48c6
    Reviewed-on: http://gerrit.cloudera.org:8080/13821
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/tablet/delta_tracker.cc       | 84 +++++++++++++++++++++-------------
 src/kudu/tablet/delta_tracker.h        | 18 +++++---
 src/kudu/tablet/diskrowset-test-base.h |  6 ++-
 src/kudu/tablet/diskrowset-test.cc     |  7 +--
 4 files changed, 72 insertions(+), 43 deletions(-)

diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 9400e32..4ea8684 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -95,7 +95,8 @@ DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata,
       read_only_(false),
       log_anchor_registry_(log_anchor_registry),
       mem_trackers_(std::move(mem_trackers)),
-      dms_empty_(true),
+      next_dms_id_(rowset_metadata_->last_durable_redo_dms_id() + 1),
+      dms_exists_(false),
       deleted_row_count_(0) {}
 
 Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
@@ -151,15 +152,22 @@ Status DeltaTracker::DoOpen(const IOContext* io_context) {
                                  &undo_delta_stores_,
                                  UNDO));
 
-  // the id of the first DeltaMemStore is the max id of the current ones +1
-  RETURN_NOT_OK(DeltaMemStore::Create(rowset_metadata_->last_durable_redo_dms_id() + 1,
+  open_ = true;
+  return Status::OK();
+}
+
+Status DeltaTracker::CreateAndInitDMSUnlocked(const fs::IOContext* io_context) {
+  DCHECK(component_lock_.is_write_locked());
+  shared_ptr<DeltaMemStore> dms;
+  RETURN_NOT_OK(DeltaMemStore::Create(next_dms_id_++,
                                       rowset_metadata_->id(),
                                       log_anchor_registry_,
                                       mem_trackers_.dms_tracker,
-                                      &dms_));
-  RETURN_NOT_OK(dms_->Init(io_context));
+                                      &dms));
+  RETURN_NOT_OK(dms->Init(io_context));
 
-  open_ = true;
+  dms_.swap(dms);
+  dms_exists_.Store(true);
   return Status::OK();
 }
 
@@ -581,7 +589,9 @@ void DeltaTracker::CollectStores(vector<shared_ptr<DeltaStore>>* deltas,
   }
   if (which != UNDOS_ONLY) {
     deltas->insert(deltas->end(), redo_delta_stores_.begin(), redo_delta_stores_.end());
-    deltas->push_back(dms_);
+    if (dms_exists_.Load() && !dms_->Empty()) {
+      deltas->push_back(dms_);
+    }
   }
 }
 
@@ -637,17 +647,32 @@ Status DeltaTracker::Update(Timestamp timestamp,
                             const RowChangeList &update,
                             const consensus::OpId& op_id,
                             OperationResultPB* result) {
-  // TODO(todd): can probably lock this more fine-grained.
-  shared_lock<rw_spinlock> lock(component_lock_);
+  Status s;
+  while (true) {
+    if (!dms_exists_.Load()) {
+      std::lock_guard<rw_spinlock> lock(component_lock_);
+      // Should check dms_exists_ here in case multiple threads are blocked.
+      if (!dms_exists_.Load()) {
+        RETURN_NOT_OK(CreateAndInitDMSUnlocked(nullptr));
+      }
+    }
 
-  Status s = dms_->Update(timestamp, row_idx, update, op_id);
-  if (s.ok()) {
-    dms_empty_.Store(false);
+    // TODO(todd): can probably lock this more fine-grained.
+    shared_lock<rw_spinlock> lock(component_lock_);
 
-    MemStoreTargetPB* target = result->add_mutated_stores();
-    target->set_rs_id(rowset_metadata_->id());
-    target->set_dms_id(dms_->id());
+    // Should check dms_exists_ here again since there is a gap
+    // between the two critical sections defined by component_lock_.
+    if (!dms_exists_.Load()) continue;
+
+    s = dms_->Update(timestamp, row_idx, update, op_id);
+    if (s.ok()) {
+      MemStoreTargetPB* target = result->add_mutated_stores();
+      target->set_rs_id(rowset_metadata_->id());
+      target->set_dms_id(dms_->id());
+    }
+    break;
   }
+
   return s;
 }
 
@@ -655,12 +680,13 @@ Status DeltaTracker::CheckRowDeleted(rowid_t row_idx, const IOContext* io_contex
                                      bool *deleted, ProbeStats* stats) const {
   shared_lock<rw_spinlock> lock(component_lock_);
 
-
   *deleted = false;
   // Check if the row has a deletion in DeltaMemStore.
-  RETURN_NOT_OK(dms_->CheckRowDeleted(row_idx, io_context, deleted));
-  if (*deleted) {
-    return Status::OK();
+  if (dms_exists_.Load()) {
+    RETURN_NOT_OK(dms_->CheckRowDeleted(row_idx, io_context, deleted));
+    if (*deleted) {
+      return Status::OK();
+    }
   }
 
   // Then check backwards through the list of trackers.
@@ -744,17 +770,11 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_
     // This shuts out any concurrent readers or writers.
     std::lock_guard<rw_spinlock> lock(component_lock_);
 
-    count = dms_->Count();
+    count = dms_exists_.Load() ? dms_->Count() : 0;
 
-    // Swap the DeltaMemStore to use the new schema
-    old_dms = dms_;
-    RETURN_NOT_OK(DeltaMemStore::Create(old_dms->id() + 1,
-                                        rowset_metadata_->id(),
-                                        log_anchor_registry_,
-                                        mem_trackers_.dms_tracker,
-                                        &dms_));
-    RETURN_NOT_OK(dms_->Init(nullptr));
-    dms_empty_.Store(true);
+    // Swap the DeltaMemStore and dms_ is null now.
+    old_dms.swap(dms_);
+    dms_exists_.Store(false);
 
     if (count == 0) {
       // No need to flush if there are no deltas.
@@ -802,12 +822,12 @@ Status DeltaTracker::Flush(const IOContext* io_context, MetadataFlushType flush_
 
 size_t DeltaTracker::DeltaMemStoreSize() const {
   shared_lock<rw_spinlock> lock(component_lock_);
-  return dms_->EstimateSize();
+  return dms_exists_.Load() ? dms_->EstimateSize() : 0;
 }
 
 int64_t DeltaTracker::MinUnflushedLogIndex() const {
   shared_lock<rw_spinlock> lock(component_lock_);
-  return dms_->MinLogIndex();
+  return dms_exists_.Load() ? dms_->MinLogIndex() : 0;
 }
 
 size_t DeltaTracker::CountUndoDeltaStores() const {
@@ -871,7 +891,7 @@ Status DeltaTracker::InitAllDeltaStoresForTests(WhichStores stores) {
 int64_t DeltaTracker::CountDeletedRows() const {
   shared_lock<rw_spinlock> lock(component_lock_);
   DCHECK_GE(deleted_row_count_, 0);
-  return deleted_row_count_ + dms_->deleted_row_count();
+  return deleted_row_count_ + (dms_exists_.Load() ? dms_->deleted_row_count() : 0);
 }
 
 string DeltaTracker::LogPrefix() const {
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index f1dabb5..072b6e6 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -230,9 +230,9 @@ class DeltaTracker {
   // Get the delta MemStore's size in bytes, including pre-allocation.
   size_t DeltaMemStoreSize() const;
 
-  // Returns true if the DMS has no entries. This doesn't rely on the size.
+  // Returns true if the DMS doesn't exist. This doesn't rely on the size.
   bool DeltaMemStoreEmpty() const {
-    return dms_empty_.Load();
+    return !dms_exists_.Load();
   }
 
   // Get the minimum log index for this tracker's DMS, -1 if it wasn't set.
@@ -322,6 +322,8 @@ class DeltaTracker {
 
   std::string LogPrefix() const;
 
+  Status CreateAndInitDMSUnlocked(const fs::IOContext* io_context);
+
   std::shared_ptr<RowSetMetadata> rowset_metadata_;
 
   bool open_;
@@ -337,6 +339,8 @@ class DeltaTracker {
 
   TabletMemTrackers mem_trackers_;
 
+  int64_t next_dms_id_;
+
   // The current DeltaMemStore into which updates should be written.
   std::shared_ptr<DeltaMemStore> dms_;
   // The set of tracked REDO delta stores, in increasing timestamp order.
@@ -345,12 +349,14 @@ class DeltaTracker {
   SharedDeltaStoreVector undo_delta_stores_;
 
   // The maintenance scheduler calls DeltaMemStoreEmpty() a lot.
-  // We cache this here to avoid having to take component_lock_
-  // in order to satisfy this call.
-  AtomicBool dms_empty_;
+  // We use an atomic variable to indicate whether DMS exists or not and
+  // to avoid having to take component_lock_ in order to satisfy this call.
+  AtomicBool dms_exists_;
 
   // read-write lock protecting dms_ and {redo,undo}_delta_stores_.
-  // - Readers and mutators take this lock in shared mode.
+  // - Readers take this lock in shared mode.
+  // - Mutators take this lock in exclusive mode if they need to create
+  //   a new DMS, and shared mode otherwise.
   // - Flushers take this lock in exclusive mode before they modify the
   //   structure of the rowset.
   //
diff --git a/src/kudu/tablet/diskrowset-test-base.h b/src/kudu/tablet/diskrowset-test-base.h
index 3c11c7c..787dcf3 100644
--- a/src/kudu/tablet/diskrowset-test-base.h
+++ b/src/kudu/tablet/diskrowset-test-base.h
@@ -59,7 +59,8 @@ class TestRowSet : public KuduRowSetTest {
     : KuduRowSetTest(CreateTestSchema()),
       n_rows_(FLAGS_roundtrip_num_rows),
       op_id_(consensus::MaximumOpId()),
-      clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)) {
+      clock_(clock::LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
+      log_anchor_registry_(new log::LogAnchorRegistry()) {
     CHECK_GT(n_rows_, 0);
   }
 
@@ -327,7 +328,7 @@ class TestRowSet : public KuduRowSetTest {
 
   Status OpenTestRowSet(std::shared_ptr<DiskRowSet> *rowset) {
     return DiskRowSet::Open(rowset_meta_,
-                            new log::LogAnchorRegistry(),
+                            log_anchor_registry_.get(),
                             TabletMemTrackers(),
                             nullptr,
                             rowset);
@@ -341,6 +342,7 @@ class TestRowSet : public KuduRowSetTest {
   consensus::OpId op_id_; // Generally a "fake" OpId for these tests.
   scoped_refptr<clock::Clock> clock_;
   MvccManager mvcc_;
+  scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
 };
 
 } // namespace tablet
diff --git a/src/kudu/tablet/diskrowset-test.cc b/src/kudu/tablet/diskrowset-test.cc
index 9155e4b..ecb6016 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -354,8 +354,8 @@ TEST_F(TestRowSet, TestDMSFlush) {
 
     ASSERT_OK(rs->FlushDeltas(nullptr));
 
-    // Check that the DiskRowSet's DMS has now been emptied.
-    ASSERT_EQ(0, rs->delta_tracker_->dms_->Count());
+    // Check that the DiskRowSet's DMS has not been initialized.
+    ASSERT_FALSE(rs->delta_tracker_->dms_);
 
     // Now read back the value column, and verify that the updates
     // are visible.
@@ -739,6 +739,7 @@ INSTANTIATE_TEST_CASE_P(RowIteratorOptionsPermutations, DiffScanRowSetTest,
 // the test operates on a variety of different on-disk and in-memory layouts.
 TEST_P(DiffScanRowSetTest, TestFuzz) {
   fs::IOContext test_context;
+  scoped_refptr<log::LogAnchorRegistry> log_anchor_registry(new log::LogAnchorRegistry());
 
   // Create and open a DRS with four rows.
   shared_ptr<DiskRowSet> rs;
@@ -756,7 +757,7 @@ TEST_P(DiffScanRowSetTest, TestFuzz) {
       ASSERT_OK(WriteRow(rb.data(), &drsw));
     }
     ASSERT_OK(drsw.Finish());
-    ASSERT_OK(DiskRowSet::Open(rowset_meta_, new log::LogAnchorRegistry(),
+    ASSERT_OK(DiskRowSet::Open(rowset_meta_, log_anchor_registry.get(),
                                TabletMemTrackers(), &test_context, &rs));
   }