You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/21 05:16:54 UTC
[kudu] 03/04: tablet: cache delta stats when flushing a DMS
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f4508ff2ceea073197eb07b85c1c4bd28a9c7b3d
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Mar 17 02:19:02 2020 -0700
tablet: cache delta stats when flushing a DMS
This allows us to GC ancient rowsets that have had their DMS flushed,
which is likely to be the case, given the default ancient history
period is 7 days.
Change-Id: I26e74467ed180cebbd8da360763ba9498661e5f3
Reviewed-on: http://gerrit.cloudera.org:8080/15460
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
src/kudu/tablet/delta_store.h | 19 +++++++++++-------
src/kudu/tablet/delta_tracker.cc | 14 ++++++-------
src/kudu/tablet/deltafile-test.cc | 2 +-
src/kudu/tablet/deltafile.cc | 15 +++++++++++---
src/kudu/tablet/deltafile.h | 33 ++++++++++++++++++++-----------
src/kudu/tablet/deltamemstore.h | 2 +-
src/kudu/tablet/mt-tablet-test.cc | 4 ++--
src/kudu/tablet/tablet_history_gc-test.cc | 4 ----
src/kudu/tablet/tablet_replica-test.cc | 22 ++++++++++++++++++---
9 files changed, 76 insertions(+), 39 deletions(-)
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 069600e..7859e9f 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -14,8 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_TABLET_DELTA_STORE_H
-#define KUDU_TABLET_DELTA_STORE_H
+#pragma once
#include <cstddef>
#include <cstdint>
@@ -179,7 +178,7 @@ class DeltaStore {
virtual Status Init(const fs::IOContext* io_context) = 0;
// Whether this delta store was initialized or not.
- virtual bool Initted() = 0;
+ virtual bool Initted() const = 0;
// Create a DeltaIterator for the given projection.
//
@@ -204,11 +203,18 @@ class DeltaStore {
virtual std::string ToString() const = 0;
- // TODO remove this once we don't need to have delta_stats for both DMS and DFR. Currently
- // DeltaTracker#GetColumnsIdxWithUpdates() needs to filter out DMS from the redo list but it
- // can't without RTTI.
+ // TODO(jdcryans): remove this once we don't need to have delta_stats for
+ // both DMS and DFR. Currently DeltaTracker#GetColumnsIdxWithUpdates() needs
+ // to filter out DMS from the redo list but it can't without RTTI.
virtual const DeltaStats& delta_stats() const = 0;
+ // Returns whether callers can use 'delta_stats()', either because they've
+ // been read from disk, or because the store has been initialized with the
+ // stats cached.
+ virtual bool has_delta_stats() const {
+ return Initted();
+ }
+
virtual ~DeltaStore() {}
};
@@ -558,4 +564,3 @@ Status WriteDeltaIteratorToFile(DeltaIterator* iter,
} // namespace tablet
} // namespace kudu
-#endif
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 7b53fc8..2e8c2ea 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -122,6 +122,7 @@ Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
s = DeltaFileReader::OpenNoInit(std::move(block),
type,
std::move(options),
+ /*delta_stats*/nullptr,
&dfr);
if (!s.ok()) {
LOG_WITH_PREFIX(ERROR) << "Failed to open " << DeltaType_Name(type)
@@ -193,7 +194,7 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const IOContext* io_context
ignore_result(down_cast<DeltaFileReader*>(delta_store.get()));
shared_ptr<DeltaFileReader> dfr = std::static_pointer_cast<DeltaFileReader>(delta_store);
- if (dfr->Initted()) {
+ if (dfr->has_delta_stats()) {
delete_count += dfr->delta_stats().delete_count();
reinsert_count += dfr->delta_stats().reinsert_count();
update_count += dfr->delta_stats().UpdateCount();
@@ -461,9 +462,7 @@ bool DeltaTracker::EstimateAllRedosAreAncient(Timestamp ancient_history_mark) {
if (!redo_delta_stores_.empty()) {
newest_redo = redo_delta_stores_.back();
}
- // TODO(awong): keep the delta stats cached after flushing a DMS so a flush
- // doesn't invalidate this rowset.
- return newest_redo && newest_redo->Initted() &&
+ return newest_redo && newest_redo->has_delta_stats() &&
newest_redo->delta_stats().max_timestamp() < ancient_history_mark;
}
@@ -477,7 +476,7 @@ Status DeltaTracker::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancie
int64_t tmp_bytes = 0;
for (const auto& undo : boost::adaptors::reverse(undos_newest_first)) {
// Short-circuit once we hit an initialized delta block with 'max_timestamp' > AHM.
- if (undo->Initted() &&
+ if (undo->has_delta_stats() &&
undo->delta_stats().max_timestamp() >= ancient_history_mark) {
break;
}
@@ -552,7 +551,7 @@ Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
// Traverse oldest-first.
for (auto& undo : boost::adaptors::reverse(undos_newest_first)) {
- if (!undo->Initted()) break; // Never initialize the deltas in this code path (it's slow).
+ if (!undo->has_delta_stats()) break;
if (undo->delta_stats().max_timestamp() >= ancient_history_mark) break;
tmp_blocks_deleted++;
tmp_bytes_deleted += undo->EstimateSize();
@@ -762,6 +761,7 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(readable_block),
REDO,
std::move(options),
+ std::move(stats),
dfr));
VLOG_WITH_PREFIX(1) << "Opened new delta block " << block_id.ToString() << " for read";
@@ -890,7 +890,7 @@ void DeltaTracker::GetColumnIdsWithUpdates(std::vector<ColumnId>* col_ids) const
set<ColumnId> column_ids_with_updates;
for (const shared_ptr<DeltaStore>& ds : redo_delta_stores_) {
// We won't force open files just to read their stats.
- if (!ds->Initted()) {
+ if (!ds->has_delta_stats()) {
continue;
}
diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc
index 207b46c..c88396f 100644
--- a/src/kudu/tablet/deltafile-test.cc
+++ b/src/kudu/tablet/deltafile-test.cc
@@ -374,7 +374,7 @@ TEST_F(TestDeltaFile, TestLazyInit) {
// Lazily opening the delta file should not trigger any reads.
shared_ptr<DeltaFileReader> reader;
ASSERT_OK(DeltaFileReader::OpenNoInit(
- std::move(count_block), REDO, ReaderOptions(), &reader));
+ std::move(count_block), REDO, ReaderOptions(), /*delta_stats*/nullptr, &reader));
ASSERT_EQ(0, bytes_read);
// But initializing it should (only the first time).
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 227429a..5e83342 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -40,6 +40,7 @@
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_relevancy.h"
@@ -217,6 +218,7 @@ Status DeltaFileReader::Open(unique_ptr<ReadableBlock> block,
RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(block),
delta_type,
std::move(options),
+ /*delta_stats*/nullptr,
&df_reader));
RETURN_NOT_OK(df_reader->Init(io_context));
@@ -227,6 +229,7 @@ Status DeltaFileReader::Open(unique_ptr<ReadableBlock> block,
Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
DeltaType delta_type,
ReaderOptions options,
+ unique_ptr<DeltaStats> delta_stats,
shared_ptr<DeltaFileReader>* reader_out) {
unique_ptr<CFileReader> cf_reader;
const IOContext* io_context = options.io_context;
@@ -234,7 +237,7 @@ Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
std::move(options),
&cf_reader));
unique_ptr<DeltaFileReader> df_reader(
- new DeltaFileReader(std::move(cf_reader), delta_type));
+ new DeltaFileReader(std::move(cf_reader), std::move(delta_stats), delta_type));
if (!FLAGS_cfile_lazy_open) {
RETURN_NOT_OK(df_reader->Init(io_context));
}
@@ -245,8 +248,10 @@ Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
}
DeltaFileReader::DeltaFileReader(unique_ptr<CFileReader> cf_reader,
+ unique_ptr<DeltaStats> delta_stats,
DeltaType delta_type)
: reader_(cf_reader.release()),
+ delta_stats_(std::move(delta_stats)),
delta_type_(delta_type) {}
Status DeltaFileReader::Init(const IOContext* io_context) {
@@ -264,7 +269,9 @@ Status DeltaFileReader::InitOnce(const IOContext* io_context) {
}
// Initialize delta file stats
- RETURN_NOT_OK(ReadDeltaStats());
+ if (!has_delta_stats()) {
+ RETURN_NOT_OK(ReadDeltaStats());
+ }
return Status::OK();
}
@@ -280,6 +287,7 @@ Status DeltaFileReader::ReadDeltaStats() {
}
unique_ptr<DeltaStats> stats(new DeltaStats());
RETURN_NOT_OK(stats->InitFromPB(deltastats_pb));
+ std::lock_guard<simple_spinlock> l(stats_lock_);
delta_stats_ = std::move(stats);
return Status::OK();
}
@@ -317,7 +325,8 @@ Status DeltaFileReader::CloneForDebugging(FsManager* fs_manager,
RETURN_NOT_OK(fs_manager->OpenBlock(reader_->block_id(), &block));
ReaderOptions options;
options.parent_mem_tracker = parent_mem_tracker;
- return DeltaFileReader::OpenNoInit(std::move(block), delta_type_, options, out);
+ return DeltaFileReader::OpenNoInit(std::move(block), delta_type_, options,
+ /*delta_stats*/nullptr, out);
}
Status DeltaFileReader::NewDeltaIterator(const RowIteratorOptions& opts,
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 68222be..1850d1a 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -20,6 +20,7 @@
#include <cstdint>
#include <deque>
#include <memory>
+#include <mutex>
#include <string>
#include <vector>
@@ -32,12 +33,12 @@
#include "kudu/cfile/cfile_writer.h"
#include "kudu/common/rowid.h"
#include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/delta_stats.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/util/faststring.h"
+#include "kudu/util/locks.h"
#include "kudu/util/once.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
@@ -147,33 +148,40 @@ class DeltaFileReader : public DeltaStore,
static Status OpenNoInit(std::unique_ptr<fs::ReadableBlock> block,
DeltaType delta_type,
cfile::ReaderOptions options,
+ std::unique_ptr<DeltaStats> delta_stats,
std::shared_ptr<DeltaFileReader>* reader_out);
- virtual Status Init(const fs::IOContext* io_context) OVERRIDE;
+ Status Init(const fs::IOContext* io_context) override;
- virtual bool Initted() OVERRIDE {
+ bool Initted() const override {
return init_once_.init_succeeded();
}
// See DeltaStore::NewDeltaIterator(...)
Status NewDeltaIterator(const RowIteratorOptions& opts,
- std::unique_ptr<DeltaIterator>* iterator) const OVERRIDE;
+ std::unique_ptr<DeltaIterator>* iterator) const override;
// See DeltaStore::CheckRowDeleted
- virtual Status CheckRowDeleted(rowid_t row_idx,
- const fs::IOContext* io_context,
- bool *deleted) const OVERRIDE;
+ Status CheckRowDeleted(rowid_t row_idx,
+ const fs::IOContext* io_context,
+ bool *deleted) const override;
- virtual uint64_t EstimateSize() const OVERRIDE;
+ uint64_t EstimateSize() const override;
const BlockId& block_id() const { return reader_->block_id(); }
- virtual const DeltaStats& delta_stats() const OVERRIDE {
- DCHECK(init_once_.init_succeeded());
+ const DeltaStats& delta_stats() const override {
+ std::lock_guard<simple_spinlock> l(stats_lock_);
+ DCHECK(delta_stats_);
return *delta_stats_;
}
- virtual std::string ToString() const OVERRIDE {
+ bool has_delta_stats() const override {
+ std::lock_guard<simple_spinlock> l(stats_lock_);
+ return delta_stats_ != nullptr;
+ }
+
+ std::string ToString() const override {
if (!init_once_.init_succeeded()) return reader_->ToString();
return strings::Substitute("$0 ($1)", reader_->ToString(), delta_stats_->ToString());
}
@@ -201,6 +209,7 @@ class DeltaFileReader : public DeltaStore,
}
DeltaFileReader(std::unique_ptr<cfile::CFileReader> cf_reader,
+ std::unique_ptr<DeltaStats> delta_stats,
DeltaType delta_type);
// Callback used in 'init_once_' to initialize this delta file.
@@ -209,6 +218,8 @@ class DeltaFileReader : public DeltaStore,
Status ReadDeltaStats();
std::shared_ptr<cfile::CFileReader> reader_;
+
+ mutable simple_spinlock stats_lock_;
std::unique_ptr<DeltaStats> delta_stats_;
// The type of this delta, i.e. UNDO or REDO.
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 4a42551..7603ebc 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -81,7 +81,7 @@ class DeltaMemStore : public DeltaStore,
virtual Status Init(const fs::IOContext* io_context) OVERRIDE;
- virtual bool Initted() OVERRIDE {
+ virtual bool Initted() const OVERRIDE {
return true;
}
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index f40b579..8cf354b 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -323,11 +323,11 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
}
}
- void MinorCompactDeltasThread(int tid) {
+ void MinorCompactDeltasThread(int /*tid*/) {
CompactDeltas(RowSet::MINOR_DELTA_COMPACTION);
}
- void MajorCompactDeltasThread(int tid) {
+ void MajorCompactDeltasThread(int /*tid*/) {
CompactDeltas(RowSet::MAJOR_DELTA_COMPACTION);
}
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc
index b5fa40b..a7e4ea0 100644
--- a/src/kudu/tablet/tablet_history_gc-test.cc
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -668,10 +668,6 @@ TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithRedoFiles) {
NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, /*flush_dms*/true));
ASSERT_EQ(kNumRowsets, tablet()->CountRedoDeltasForTests());
- // TODO(awong): keep the delta stats cached after flushing a DMS so we don't
- // have to scan to read stats.
- NO_FATALS(VerifyTestRows(0, 0));
-
// We shouldn't have any ancient rowsets since we haven't passed the AHM.
int64_t bytes = 0;
ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index c4b88d3..baad47c 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -810,7 +810,8 @@ TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) {
auto live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
tablet->GetMetricEntity(), [] () { return 0; });
- // Insert some rows and flush so we get a DRS.
+ // Insert some rows and flush so we get a DRS, and then delete them so we
+ // have an ancient, fully deleted DRS.
ASSERT_OK(ExecuteInsertsAndRollLogs(kNumRows));
ASSERT_OK(tablet->Flush());
ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows));
@@ -828,14 +829,29 @@ TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) {
ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
ASSERT_EQ(1, tablet->num_rowsets());
ASSERT_EQ(kNumRows, live_row_count->value());
+ ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows));
+ ASSERT_EQ(0, live_row_count->value());
- // Restart and ensure we can get online okay.
+ // Restart and ensure we can rebuild our DMS okay.
NO_FATALS(RestartReplica());
tablet = tablet_replica_->tablet();
ASSERT_EQ(1, tablet->num_rowsets());
live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
tablet->GetMetricEntity(), [] () { return 0; });
- ASSERT_EQ(kNumRows, live_row_count->value());
+ ASSERT_EQ(0, live_row_count->value());
+
+ // Now do that again but with deltafiles.
+ ASSERT_OK(tablet->FlushBiggestDMS());
+ NO_FATALS(RestartReplica());
+ tablet = tablet_replica_->tablet();
+ ASSERT_EQ(1, tablet->num_rowsets());
+
+ // Wait for our deleted rowset to become ancient. Since we just started up,
+ // we shouldn't have read any delta stats, so running the GC won't pick up
+ // our deleted DRS.
+ SleepFor(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec));
+ ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
+ ASSERT_EQ(1, tablet->num_rowsets());
}
} // namespace tablet