You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/21 05:16:54 UTC

[kudu] 03/04: tablet: cache delta stats when flushing a DMS

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f4508ff2ceea073197eb07b85c1c4bd28a9c7b3d
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Mar 17 02:19:02 2020 -0700

    tablet: cache delta stats when flushing a DMS
    
    This allows us to GC ancient rowsets that have had their DMS flushed,
    which is likely to be the case, given the default ancient history
    period is 7 days.
    
    Change-Id: I26e74467ed180cebbd8da360763ba9498661e5f3
    Reviewed-on: http://gerrit.cloudera.org:8080/15460
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/tablet/delta_store.h             | 19 +++++++++++-------
 src/kudu/tablet/delta_tracker.cc          | 14 ++++++-------
 src/kudu/tablet/deltafile-test.cc         |  2 +-
 src/kudu/tablet/deltafile.cc              | 15 +++++++++++---
 src/kudu/tablet/deltafile.h               | 33 ++++++++++++++++++++-----------
 src/kudu/tablet/deltamemstore.h           |  2 +-
 src/kudu/tablet/mt-tablet-test.cc         |  4 ++--
 src/kudu/tablet/tablet_history_gc-test.cc |  4 ----
 src/kudu/tablet/tablet_replica-test.cc    | 22 ++++++++++++++++++---
 9 files changed, 76 insertions(+), 39 deletions(-)

diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 069600e..7859e9f 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_TABLET_DELTA_STORE_H
-#define KUDU_TABLET_DELTA_STORE_H
+#pragma once
 
 #include <cstddef>
 #include <cstdint>
@@ -179,7 +178,7 @@ class DeltaStore {
   virtual Status Init(const fs::IOContext* io_context) = 0;
 
   // Whether this delta store was initialized or not.
-  virtual bool Initted() = 0;
+  virtual bool Initted() const = 0;
 
   // Create a DeltaIterator for the given projection.
   //
@@ -204,11 +203,18 @@ class DeltaStore {
 
   virtual std::string ToString() const = 0;
 
-  // TODO remove this once we don't need to have delta_stats for both DMS and DFR. Currently
-  // DeltaTracker#GetColumnsIdxWithUpdates() needs to filter out DMS from the redo list but it
-  // can't without RTTI.
+  // TODO(jdcryans): remove this once we don't need to have delta_stats for
+  // both DMS and DFR. Currently DeltaTracker#GetColumnsIdxWithUpdates() needs
+  // to filter out DMS from the redo list but it can't without RTTI.
   virtual const DeltaStats& delta_stats() const = 0;
 
+  // Returns whether callers can use 'delta_stats()', either because they've
+  // been read from disk, or because the store has been initialized with the
+  // stats cached.
+  virtual bool has_delta_stats() const  {
+    return Initted();
+  }
+
   virtual ~DeltaStore() {}
 };
 
@@ -558,4 +564,3 @@ Status WriteDeltaIteratorToFile(DeltaIterator* iter,
 } // namespace tablet
 } // namespace kudu
 
-#endif
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 7b53fc8..2e8c2ea 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -122,6 +122,7 @@ Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
     s = DeltaFileReader::OpenNoInit(std::move(block),
                                     type,
                                     std::move(options),
+                                    /*delta_stats*/nullptr,
                                     &dfr);
     if (!s.ok()) {
       LOG_WITH_PREFIX(ERROR) << "Failed to open " << DeltaType_Name(type)
@@ -193,7 +194,7 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const IOContext* io_context
     ignore_result(down_cast<DeltaFileReader*>(delta_store.get()));
     shared_ptr<DeltaFileReader> dfr = std::static_pointer_cast<DeltaFileReader>(delta_store);
 
-    if (dfr->Initted()) {
+    if (dfr->has_delta_stats()) {
       delete_count += dfr->delta_stats().delete_count();
       reinsert_count += dfr->delta_stats().reinsert_count();
       update_count += dfr->delta_stats().UpdateCount();
@@ -461,9 +462,7 @@ bool DeltaTracker::EstimateAllRedosAreAncient(Timestamp ancient_history_mark) {
   if (!redo_delta_stores_.empty()) {
     newest_redo = redo_delta_stores_.back();
   }
-  // TODO(awong): keep the delta stats cached after flushing a DMS so a flush
-  // doesn't invalidate this rowset.
-  return newest_redo && newest_redo->Initted() &&
+  return newest_redo && newest_redo->has_delta_stats() &&
       newest_redo->delta_stats().max_timestamp() < ancient_history_mark;
 }
 
@@ -477,7 +476,7 @@ Status DeltaTracker::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancie
   int64_t tmp_bytes = 0;
   for (const auto& undo : boost::adaptors::reverse(undos_newest_first)) {
     // Short-circuit once we hit an initialized delta block with 'max_timestamp' > AHM.
-    if (undo->Initted() &&
+    if (undo->has_delta_stats() &&
         undo->delta_stats().max_timestamp() >= ancient_history_mark) {
       break;
     }
@@ -552,7 +551,7 @@ Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
 
   // Traverse oldest-first.
   for (auto& undo : boost::adaptors::reverse(undos_newest_first)) {
-    if (!undo->Initted()) break; // Never initialize the deltas in this code path (it's slow).
+    if (!undo->has_delta_stats()) break;
     if (undo->delta_stats().max_timestamp() >= ancient_history_mark) break;
     tmp_blocks_deleted++;
     tmp_bytes_deleted += undo->EstimateSize();
@@ -762,6 +761,7 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
   RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(readable_block),
                                             REDO,
                                             std::move(options),
+                                            std::move(stats),
                                             dfr));
   VLOG_WITH_PREFIX(1) << "Opened new delta block " << block_id.ToString() << " for read";
 
@@ -890,7 +890,7 @@ void DeltaTracker::GetColumnIdsWithUpdates(std::vector<ColumnId>* col_ids) const
   set<ColumnId> column_ids_with_updates;
   for (const shared_ptr<DeltaStore>& ds : redo_delta_stores_) {
     // We won't force open files just to read their stats.
-    if (!ds->Initted()) {
+    if (!ds->has_delta_stats()) {
       continue;
     }
 
diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc
index 207b46c..c88396f 100644
--- a/src/kudu/tablet/deltafile-test.cc
+++ b/src/kudu/tablet/deltafile-test.cc
@@ -374,7 +374,7 @@ TEST_F(TestDeltaFile, TestLazyInit) {
   // Lazily opening the delta file should not trigger any reads.
   shared_ptr<DeltaFileReader> reader;
   ASSERT_OK(DeltaFileReader::OpenNoInit(
-      std::move(count_block), REDO, ReaderOptions(), &reader));
+      std::move(count_block), REDO, ReaderOptions(), /*delta_stats*/nullptr, &reader));
   ASSERT_EQ(0, bytes_read);
 
   // But initializing it should (only the first time).
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 227429a..5e83342 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -40,6 +40,7 @@
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_relevancy.h"
@@ -217,6 +218,7 @@ Status DeltaFileReader::Open(unique_ptr<ReadableBlock> block,
   RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(block),
                                             delta_type,
                                             std::move(options),
+                                            /*delta_stats*/nullptr,
                                             &df_reader));
   RETURN_NOT_OK(df_reader->Init(io_context));
 
@@ -227,6 +229,7 @@ Status DeltaFileReader::Open(unique_ptr<ReadableBlock> block,
 Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
                                    DeltaType delta_type,
                                    ReaderOptions options,
+                                   unique_ptr<DeltaStats> delta_stats,
                                    shared_ptr<DeltaFileReader>* reader_out) {
   unique_ptr<CFileReader> cf_reader;
   const IOContext* io_context = options.io_context;
@@ -234,7 +237,7 @@ Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
                                         std::move(options),
                                         &cf_reader));
   unique_ptr<DeltaFileReader> df_reader(
-      new DeltaFileReader(std::move(cf_reader), delta_type));
+      new DeltaFileReader(std::move(cf_reader), std::move(delta_stats), delta_type));
   if (!FLAGS_cfile_lazy_open) {
     RETURN_NOT_OK(df_reader->Init(io_context));
   }
@@ -245,8 +248,10 @@ Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
 }
 
 DeltaFileReader::DeltaFileReader(unique_ptr<CFileReader> cf_reader,
+                                 unique_ptr<DeltaStats> delta_stats,
                                  DeltaType delta_type)
     : reader_(cf_reader.release()),
+      delta_stats_(std::move(delta_stats)),
       delta_type_(delta_type) {}
 
 Status DeltaFileReader::Init(const IOContext* io_context) {
@@ -264,7 +269,9 @@ Status DeltaFileReader::InitOnce(const IOContext* io_context) {
   }
 
   // Initialize delta file stats
-  RETURN_NOT_OK(ReadDeltaStats());
+  if (!has_delta_stats()) {
+    RETURN_NOT_OK(ReadDeltaStats());
+  }
   return Status::OK();
 }
 
@@ -280,6 +287,7 @@ Status DeltaFileReader::ReadDeltaStats() {
   }
   unique_ptr<DeltaStats> stats(new DeltaStats());
   RETURN_NOT_OK(stats->InitFromPB(deltastats_pb));
+  std::lock_guard<simple_spinlock> l(stats_lock_);
   delta_stats_ = std::move(stats);
   return Status::OK();
 }
@@ -317,7 +325,8 @@ Status DeltaFileReader::CloneForDebugging(FsManager* fs_manager,
   RETURN_NOT_OK(fs_manager->OpenBlock(reader_->block_id(), &block));
   ReaderOptions options;
   options.parent_mem_tracker = parent_mem_tracker;
-  return DeltaFileReader::OpenNoInit(std::move(block), delta_type_, options, out);
+  return DeltaFileReader::OpenNoInit(std::move(block), delta_type_, options,
+                                     /*delta_stats*/nullptr, out);
 }
 
 Status DeltaFileReader::NewDeltaIterator(const RowIteratorOptions& opts,
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 68222be..1850d1a 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -20,6 +20,7 @@
 #include <cstdint>
 #include <deque>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -32,12 +33,12 @@
 #include "kudu/cfile/cfile_writer.h"
 #include "kudu/common/rowid.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/once.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -147,33 +148,40 @@ class DeltaFileReader : public DeltaStore,
   static Status OpenNoInit(std::unique_ptr<fs::ReadableBlock> block,
                            DeltaType delta_type,
                            cfile::ReaderOptions options,
+                           std::unique_ptr<DeltaStats> delta_stats,
                            std::shared_ptr<DeltaFileReader>* reader_out);
 
-  virtual Status Init(const fs::IOContext* io_context) OVERRIDE;
+  Status Init(const fs::IOContext* io_context) override;
 
-  virtual bool Initted() OVERRIDE {
+  bool Initted() const override {
     return init_once_.init_succeeded();
   }
 
   // See DeltaStore::NewDeltaIterator(...)
   Status NewDeltaIterator(const RowIteratorOptions& opts,
-                          std::unique_ptr<DeltaIterator>* iterator) const OVERRIDE;
+                          std::unique_ptr<DeltaIterator>* iterator) const override;
 
   // See DeltaStore::CheckRowDeleted
-  virtual Status CheckRowDeleted(rowid_t row_idx,
-                                 const fs::IOContext* io_context,
-                                 bool *deleted) const OVERRIDE;
+  Status CheckRowDeleted(rowid_t row_idx,
+                         const fs::IOContext* io_context,
+                         bool *deleted) const override;
 
-  virtual uint64_t EstimateSize() const OVERRIDE;
+  uint64_t EstimateSize() const override;
 
   const BlockId& block_id() const { return reader_->block_id(); }
 
-  virtual const DeltaStats& delta_stats() const OVERRIDE {
-    DCHECK(init_once_.init_succeeded());
+  const DeltaStats& delta_stats() const override {
+    std::lock_guard<simple_spinlock> l(stats_lock_);
+    DCHECK(delta_stats_);
     return *delta_stats_;
   }
 
-  virtual std::string ToString() const OVERRIDE {
+  bool has_delta_stats() const override {
+    std::lock_guard<simple_spinlock> l(stats_lock_);
+    return delta_stats_ != nullptr;
+  }
+
+  std::string ToString() const override {
     if (!init_once_.init_succeeded()) return reader_->ToString();
     return strings::Substitute("$0 ($1)", reader_->ToString(), delta_stats_->ToString());
   }
@@ -201,6 +209,7 @@ class DeltaFileReader : public DeltaStore,
   }
 
   DeltaFileReader(std::unique_ptr<cfile::CFileReader> cf_reader,
+                  std::unique_ptr<DeltaStats> delta_stats,
                   DeltaType delta_type);
 
   // Callback used in 'init_once_' to initialize this delta file.
@@ -209,6 +218,8 @@ class DeltaFileReader : public DeltaStore,
   Status ReadDeltaStats();
 
   std::shared_ptr<cfile::CFileReader> reader_;
+
+  mutable simple_spinlock stats_lock_;
   std::unique_ptr<DeltaStats> delta_stats_;
 
   // The type of this delta, i.e. UNDO or REDO.
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 4a42551..7603ebc 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -81,7 +81,7 @@ class DeltaMemStore : public DeltaStore,
 
   virtual Status Init(const fs::IOContext* io_context) OVERRIDE;
 
-  virtual bool Initted() OVERRIDE {
+  virtual bool Initted() const OVERRIDE {
     return true;
   }
 
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index f40b579..8cf354b 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -323,11 +323,11 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
     }
   }
 
-  void MinorCompactDeltasThread(int tid) {
+  void MinorCompactDeltasThread(int /*tid*/) {
     CompactDeltas(RowSet::MINOR_DELTA_COMPACTION);
   }
 
-  void MajorCompactDeltasThread(int tid) {
+  void MajorCompactDeltasThread(int /*tid*/) {
     CompactDeltas(RowSet::MAJOR_DELTA_COMPACTION);
   }
 
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc
index b5fa40b..a7e4ea0 100644
--- a/src/kudu/tablet/tablet_history_gc-test.cc
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -668,10 +668,6 @@ TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithRedoFiles) {
   NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, /*flush_dms*/true));
   ASSERT_EQ(kNumRowsets, tablet()->CountRedoDeltasForTests());
 
-  // TODO(awong): keep the delta stats cached after flushing a DMS so we don't
-  // have to scan to read stats.
-  NO_FATALS(VerifyTestRows(0, 0));
-
   // We shouldn't have any ancient rowsets since we haven't passed the AHM.
   int64_t bytes = 0;
   ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index c4b88d3..baad47c 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -810,7 +810,8 @@ TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) {
   auto live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
       tablet->GetMetricEntity(), [] () { return 0; });
 
-  // Insert some rows and flush so we get a DRS.
+  // Insert some rows and flush so we get a DRS, and then delete them so we
+  // have an ancient, fully deleted DRS.
   ASSERT_OK(ExecuteInsertsAndRollLogs(kNumRows));
   ASSERT_OK(tablet->Flush());
   ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows));
@@ -828,14 +829,29 @@ TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) {
   ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
   ASSERT_EQ(1, tablet->num_rowsets());
   ASSERT_EQ(kNumRows, live_row_count->value());
+  ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows));
+  ASSERT_EQ(0, live_row_count->value());
 
-  // Restart and ensure we can get online okay.
+  // Restart and ensure we can rebuild our DMS okay.
   NO_FATALS(RestartReplica());
   tablet = tablet_replica_->tablet();
   ASSERT_EQ(1, tablet->num_rowsets());
   live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
       tablet->GetMetricEntity(), [] () { return 0; });
-  ASSERT_EQ(kNumRows, live_row_count->value());
+  ASSERT_EQ(0, live_row_count->value());
+
+  // Now do that again but with deltafiles.
+  ASSERT_OK(tablet->FlushBiggestDMS());
+  NO_FATALS(RestartReplica());
+  tablet = tablet_replica_->tablet();
+  ASSERT_EQ(1, tablet->num_rowsets());
+
+  // Wait for our deleted rowset to become ancient. Since we just started up,
+  // we shouldn't have read any delta stats, so running the GC won't pick up
+  // our deleted DRS.
+  SleepFor(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec));
+  ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
+  ASSERT_EQ(1, tablet->num_rowsets());
 }
 
 } // namespace tablet