You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2019/01/09 18:11:10 UTC
[kudu] 01/04: KUDU-2656: pass IOContext to ValidateDeltaOrder
This is an automated email from the ASF dual-hosted git repository.
abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 17b5efd4784caf6847f3b786632b79b9893a47c1
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Jan 8 00:14:33 2019 -0800
KUDU-2656: pass IOContext to ValidateDeltaOrder
Previously, checksum errors that occurred during calls to the debug-only
function ValidateDeltaOrder() would lead to a DCHECK failure. This patch
plumbs an IOContext to these calls to avoid this and adds a regression
test for such cases.
Change-Id: I3364ad95a5b3608db6538151007c4b6d16500f2b
Reviewed-on: http://gerrit.cloudera.org:8080/12178
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
src/kudu/tablet/delta_compaction.cc | 4 ++--
src/kudu/tablet/delta_tracker.cc | 21 ++++++++++++------
src/kudu/tablet/delta_tracker.h | 10 ++++++++-
src/kudu/tablet/major_delta_compaction-test.cc | 30 ++++++++++++++++++++++++++
src/kudu/tablet/tablet.cc | 5 +++--
src/kudu/tablet/tablet.h | 3 ++-
6 files changed, 60 insertions(+), 13 deletions(-)
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index ff2e071..475306a 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -421,12 +421,12 @@ Status MajorDeltaCompaction::UpdateDeltaTracker(DeltaTracker* tracker,
// Even if we didn't create any new redo blocks, we still need to update the
// tracker so it removes the included_stores_.
- tracker->AtomicUpdateStores(included_stores_, new_redo_stores, REDO);
+ tracker->AtomicUpdateStores(included_stores_, new_redo_stores, io_context, REDO);
// We only call AtomicUpdateStores() for UNDOs if we wrote UNDOs. We're not
// removing stores so we don't need to call it otherwise.
if (!new_undo_stores.empty()) {
- tracker->AtomicUpdateStores({}, new_undo_stores, UNDO);
+ tracker->AtomicUpdateStores({}, new_undo_stores, io_context, UNDO);
}
return Status::OK();
}
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 8c3df30..9b4fd2b 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -204,8 +204,10 @@ string JoinDeltaStoreStrings(const SharedDeltaStoreVector& stores) {
} // anonymous namespace
+#ifndef NDEBUG
Status DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
const std::shared_ptr<DeltaStore>& second,
+ const IOContext* io_context,
DeltaType type) {
shared_ptr<DeltaStore> first_copy = first;
shared_ptr<DeltaStore> second_copy = second;
@@ -216,14 +218,14 @@ Status DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first
shared_ptr<DeltaFileReader> first_clone;
RETURN_NOT_OK(down_cast<DeltaFileReader*>(first.get())->CloneForDebugging(
rowset_metadata_->fs_manager(), mem_trackers_.tablet_tracker, &first_clone));
- RETURN_NOT_OK(first_clone->Init(nullptr));
+ RETURN_NOT_OK(first_clone->Init(io_context));
first_copy = first_clone;
}
if (!second_copy->Initted()) {
shared_ptr<DeltaFileReader> second_clone;
RETURN_NOT_OK(down_cast<DeltaFileReader*>(second.get())->CloneForDebugging(
rowset_metadata_->fs_manager(), mem_trackers_.tablet_tracker, &second_clone));
- RETURN_NOT_OK(second_clone->Init(nullptr));
+ RETURN_NOT_OK(second_clone->Init(io_context));
second_copy = second_clone;
}
@@ -244,15 +246,19 @@ Status DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first
return Status::OK();
}
-Status DeltaTracker::ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type) {
+Status DeltaTracker::ValidateDeltasOrdered(const SharedDeltaStoreVector& list,
+ const IOContext* io_context,
+ DeltaType type) {
for (size_t i = 1; i < list.size(); i++) {
- RETURN_NOT_OK(ValidateDeltaOrder(list[i - 1], list[i], type));
+ RETURN_NOT_OK(ValidateDeltaOrder(list[i - 1], list[i], io_context, type));
}
return Status::OK();
}
+#endif // NDEBUG
void DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
const SharedDeltaStoreVector& new_stores,
+ const IOContext* io_context,
DeltaType type) {
std::lock_guard<rw_spinlock> lock(component_lock_);
SharedDeltaStoreVector* stores_to_update =
@@ -299,11 +305,12 @@ void DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_re
// Make sure the new stores are already ordered. Non-OK Statuses here don't
// indicate ordering errors, but rather, physical errors (e.g. disk
// failure). These don't indicate incorrectness and are thusly ignored.
- WARN_NOT_OK(ValidateDeltasOrdered(new_stores, type), "Could not validate delta order");
+ WARN_NOT_OK(ValidateDeltasOrdered(new_stores, io_context, type),
+ "Could not validate delta order");
if (start_it != stores_to_update->end()) {
// Sanity check that the last store we are adding would logically appear
// before the first store that would follow it.
- WARN_NOT_OK(ValidateDeltaOrder(*new_stores.rbegin(), *start_it, type),
+ WARN_NOT_OK(ValidateDeltaOrder(*new_stores.rbegin(), *start_it, io_context, type),
"Could not validate delta order");
}
}
@@ -343,7 +350,7 @@ Status DeltaTracker::CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate&
rowset_metadata_->AddOrphanedBlocks(removed_blocks);
// Once we successfully commit to the rowset metadata, let's ensure we update
// the delta stores to maintain consistency between the two.
- AtomicUpdateStores(to_remove, new_stores, type);
+ AtomicUpdateStores(to_remove, new_stores, io_context, type);
if (flush_type == FLUSH_METADATA) {
// Flushing the metadata is considered best-effort in this function.
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index f1bf87a..edac431 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -195,6 +195,7 @@ class DeltaTracker {
std::vector<std::shared_ptr<DeltaStore>>* stores,
DeltaType type);
+#ifndef NDEBUG
// Validates that 'first' may precede 'second' in an ordered list of deltas,
// given a delta type of 'type'. This should only be run in DEBUG mode.
//
@@ -202,6 +203,7 @@ class DeltaTracker {
// validation could not be performed.
Status ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
const std::shared_ptr<DeltaStore>& second,
+ const fs::IOContext* io_context,
DeltaType type);
// Validates the relative ordering of the deltas in the specified list. This
@@ -209,14 +211,20 @@ class DeltaTracker {
//
// Crashes if there is an ordering violation, and returns an error if the
// validation could not be performed.
- Status ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type);
+ Status ValidateDeltasOrdered(const SharedDeltaStoreVector& list,
+ const fs::IOContext* io_context,
+ DeltaType type);
+#endif // NDEBUG
// Replaces the subsequence of stores that matches 'stores_to_replace' with
// delta file readers corresponding to 'new_delta_blocks', which may be empty.
// If 'stores_to_replace' is empty then the stores represented by
// 'new_delta_blocks' are prepended to the relevant delta stores list.
+ //
+ // In DEBUG mode, this may do IO to validate the delta ordering.
void AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
const SharedDeltaStoreVector& new_stores,
+ const fs::IOContext* io_context,
DeltaType type);
// Get the delta MemStore's size in bytes, including pre-allocation.
diff --git a/src/kudu/tablet/major_delta_compaction-test.cc b/src/kudu/tablet/major_delta_compaction-test.cc
index 84671a2..69f5c07 100644
--- a/src/kudu/tablet/major_delta_compaction-test.cc
+++ b/src/kudu/tablet/major_delta_compaction-test.cc
@@ -23,6 +23,7 @@
#include <unordered_set>
#include <vector>
+#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
@@ -30,6 +31,7 @@
#include "kudu/common/iterator.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/schema.h"
+#include "kudu/fs/io_context.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
@@ -42,6 +44,8 @@
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
+DECLARE_double(cfile_inject_corruption);
+
using std::shared_ptr;
using std::string;
using std::unordered_set;
@@ -176,6 +180,32 @@ class TestMajorDeltaCompaction : public KuduRowSetTest {
vector<ExpectedRow> expected_state_;
};
+// Regression test for KUDU-2656, wherein a corruption during a major delta
+// compaction would lead to a failure in debug mode.
+TEST_F(TestMajorDeltaCompaction, TestKudu2656) {
+ constexpr int kNumRows = 100;
+ NO_FATALS(WriteTestTablet(kNumRows));
+ ASSERT_OK(tablet()->Flush());
+ vector<shared_ptr<RowSet>> all_rowsets;
+ tablet()->GetRowSetsForTests(&all_rowsets);
+ ASSERT_FALSE(all_rowsets.empty());
+ shared_ptr<RowSet> rs = all_rowsets.front();
+ // Create some on-disk deltas.
+ NO_FATALS(UpdateRows(kNumRows, /*even=*/false));
+ ASSERT_OK(tablet()->FlushBiggestDMS());
+
+ // Major compact some columns.
+ vector<ColumnId> col_ids = { schema_.column_id(1),
+ schema_.column_id(3),
+ schema_.column_id(4) };
+
+ // Injecting a failure should result in an error, not a crash.
+ FLAGS_cfile_inject_corruption = 1.0;
+ fs::IOContext io_context({ "test-tablet" });
+ Status s = tablet()->DoMajorDeltaCompaction(col_ids, rs, &io_context);
+ ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+}
+
// Tests a major delta compaction run.
// Verifies that the output rowset accurately reflects the mutations, but keeps the
// unchanged columns intact.
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 0d36f4c..40b728f 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1083,10 +1083,11 @@ void Tablet::AtomicSwapRowSetsUnlocked(const RowSetVector &to_remove,
}
Status Tablet::DoMajorDeltaCompaction(const vector<ColumnId>& col_ids,
- const shared_ptr<RowSet>& input_rs) {
+ const shared_ptr<RowSet>& input_rs,
+ const IOContext* io_context) {
RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
Status s = down_cast<DiskRowSet*>(input_rs.get())
- ->MajorCompactDeltaStoresWithColumnIds(col_ids, nullptr, GetHistoryGcOpts());
+ ->MajorCompactDeltaStoresWithColumnIds(col_ids, io_context, GetHistoryGcOpts());
return s;
}
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 4e3fadc..9853191 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -385,7 +385,8 @@ class Tablet {
//
// Only used in tests.
Status DoMajorDeltaCompaction(const std::vector<ColumnId>& col_ids,
- const std::shared_ptr<RowSet>& input_rs);
+ const std::shared_ptr<RowSet>& input_rs,
+ const fs::IOContext* io_context = nullptr);
// Calculates the ancient history mark and returns true iff tablet history GC
// is enabled, which requires the use of a HybridClock.