You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2019/01/09 18:11:10 UTC

[kudu] 01/04: KUDU-2656: pass IOContext to ValidateDeltaOrder

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 17b5efd4784caf6847f3b786632b79b9893a47c1
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Jan 8 00:14:33 2019 -0800

    KUDU-2656: pass IOContext to ValidateDeltaOrder
    
    Previously, checksum errors that occurred during calls to the debug-only
    function ValidateDeltaOrder() would lead to a DCHECK failure. This patch
    plumbs an IOContext to these calls to avoid this and adds a regression
    test for such cases.
    
    Change-Id: I3364ad95a5b3608db6538151007c4b6d16500f2b
    Reviewed-on: http://gerrit.cloudera.org:8080/12178
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/tablet/delta_compaction.cc            |  4 ++--
 src/kudu/tablet/delta_tracker.cc               | 21 ++++++++++++------
 src/kudu/tablet/delta_tracker.h                | 10 ++++++++-
 src/kudu/tablet/major_delta_compaction-test.cc | 30 ++++++++++++++++++++++++++
 src/kudu/tablet/tablet.cc                      |  5 +++--
 src/kudu/tablet/tablet.h                       |  3 ++-
 6 files changed, 60 insertions(+), 13 deletions(-)

diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index ff2e071..475306a 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -421,12 +421,12 @@ Status MajorDeltaCompaction::UpdateDeltaTracker(DeltaTracker* tracker,
 
   // Even if we didn't create any new redo blocks, we still need to update the
   // tracker so it removes the included_stores_.
-  tracker->AtomicUpdateStores(included_stores_, new_redo_stores, REDO);
+  tracker->AtomicUpdateStores(included_stores_, new_redo_stores, io_context, REDO);
 
   // We only call AtomicUpdateStores() for UNDOs if we wrote UNDOs. We're not
   // removing stores so we don't need to call it otherwise.
   if (!new_undo_stores.empty()) {
-    tracker->AtomicUpdateStores({}, new_undo_stores, UNDO);
+    tracker->AtomicUpdateStores({}, new_undo_stores, io_context, UNDO);
   }
   return Status::OK();
 }
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 8c3df30..9b4fd2b 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -204,8 +204,10 @@ string JoinDeltaStoreStrings(const SharedDeltaStoreVector& stores) {
 
 } // anonymous namespace
 
+#ifndef NDEBUG
 Status DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
                                         const std::shared_ptr<DeltaStore>& second,
+                                        const IOContext* io_context,
                                         DeltaType type) {
   shared_ptr<DeltaStore> first_copy = first;
   shared_ptr<DeltaStore> second_copy = second;
@@ -216,14 +218,14 @@ Status DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first
     shared_ptr<DeltaFileReader> first_clone;
     RETURN_NOT_OK(down_cast<DeltaFileReader*>(first.get())->CloneForDebugging(
         rowset_metadata_->fs_manager(), mem_trackers_.tablet_tracker, &first_clone));
-    RETURN_NOT_OK(first_clone->Init(nullptr));
+    RETURN_NOT_OK(first_clone->Init(io_context));
     first_copy = first_clone;
   }
   if (!second_copy->Initted()) {
     shared_ptr<DeltaFileReader> second_clone;
     RETURN_NOT_OK(down_cast<DeltaFileReader*>(second.get())->CloneForDebugging(
         rowset_metadata_->fs_manager(), mem_trackers_.tablet_tracker, &second_clone));
-    RETURN_NOT_OK(second_clone->Init(nullptr));
+    RETURN_NOT_OK(second_clone->Init(io_context));
     second_copy = second_clone;
   }
 
@@ -244,15 +246,19 @@ Status DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first
   return Status::OK();
 }
 
-Status DeltaTracker::ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type) {
+Status DeltaTracker::ValidateDeltasOrdered(const SharedDeltaStoreVector& list,
+                                           const IOContext* io_context,
+                                           DeltaType type) {
   for (size_t i = 1; i < list.size(); i++) {
-    RETURN_NOT_OK(ValidateDeltaOrder(list[i - 1], list[i], type));
+    RETURN_NOT_OK(ValidateDeltaOrder(list[i - 1], list[i], io_context, type));
   }
   return Status::OK();
 }
+#endif // NDEBUG
 
 void DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
                                       const SharedDeltaStoreVector& new_stores,
+                                      const IOContext* io_context,
                                       DeltaType type) {
   std::lock_guard<rw_spinlock> lock(component_lock_);
   SharedDeltaStoreVector* stores_to_update =
@@ -299,11 +305,12 @@ void DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_re
     // Make sure the new stores are already ordered. Non-OK Statuses here don't
     // indicate ordering errors, but rather, physical errors (e.g. disk
     // failure). These don't indicate incorrectness and are thusly ignored.
-    WARN_NOT_OK(ValidateDeltasOrdered(new_stores, type), "Could not validate delta order");
+    WARN_NOT_OK(ValidateDeltasOrdered(new_stores, io_context, type),
+                "Could not validate delta order");
     if (start_it != stores_to_update->end()) {
       // Sanity check that the last store we are adding would logically appear
       // before the first store that would follow it.
-      WARN_NOT_OK(ValidateDeltaOrder(*new_stores.rbegin(), *start_it, type),
+      WARN_NOT_OK(ValidateDeltaOrder(*new_stores.rbegin(), *start_it, io_context, type),
                   "Could not validate delta order");
     }
   }
@@ -343,7 +350,7 @@ Status DeltaTracker::CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate&
   rowset_metadata_->AddOrphanedBlocks(removed_blocks);
   // Once we successfully commit to the rowset metadata, let's ensure we update
   // the delta stores to maintain consistency between the two.
-  AtomicUpdateStores(to_remove, new_stores, type);
+  AtomicUpdateStores(to_remove, new_stores, io_context, type);
 
   if (flush_type == FLUSH_METADATA) {
     // Flushing the metadata is considered best-effort in this function.
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index f1bf87a..edac431 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -195,6 +195,7 @@ class DeltaTracker {
                           std::vector<std::shared_ptr<DeltaStore>>* stores,
                           DeltaType type);
 
+#ifndef NDEBUG
   // Validates that 'first' may precede 'second' in an ordered list of deltas,
   // given a delta type of 'type'. This should only be run in DEBUG mode.
   //
@@ -202,6 +203,7 @@ class DeltaTracker {
   // validation could not be performed.
   Status ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
                             const std::shared_ptr<DeltaStore>& second,
+                            const fs::IOContext* io_context,
                             DeltaType type);
 
   // Validates the relative ordering of the deltas in the specified list. This
@@ -209,14 +211,20 @@ class DeltaTracker {
   //
   // Crashes if there is an ordering violation, and returns an error if the
   // validation could not be performed.
-  Status ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type);
+  Status ValidateDeltasOrdered(const SharedDeltaStoreVector& list,
+                               const fs::IOContext* io_context,
+                               DeltaType type);
+#endif // NDEBUG
 
   // Replaces the subsequence of stores that matches 'stores_to_replace' with
   // delta file readers corresponding to 'new_delta_blocks', which may be empty.
   // If 'stores_to_replace' is empty then the stores represented by
   // 'new_delta_blocks' are prepended to the relevant delta stores list.
+  //
+  // In DEBUG mode, this may do IO to validate the delta ordering.
   void AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
                           const SharedDeltaStoreVector& new_stores,
+                          const fs::IOContext* io_context,
                           DeltaType type);
 
   // Get the delta MemStore's size in bytes, including pre-allocation.
diff --git a/src/kudu/tablet/major_delta_compaction-test.cc b/src/kudu/tablet/major_delta_compaction-test.cc
index 84671a2..69f5c07 100644
--- a/src/kudu/tablet/major_delta_compaction-test.cc
+++ b/src/kudu/tablet/major_delta_compaction-test.cc
@@ -23,6 +23,7 @@
 #include <unordered_set>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -30,6 +31,7 @@
 #include "kudu/common/iterator.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/schema.h"
+#include "kudu/fs/io_context.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
@@ -42,6 +44,8 @@
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 
+DECLARE_double(cfile_inject_corruption);
+
 using std::shared_ptr;
 using std::string;
 using std::unordered_set;
@@ -176,6 +180,32 @@ class TestMajorDeltaCompaction : public KuduRowSetTest {
   vector<ExpectedRow> expected_state_;
 };
 
+// Regression test for KUDU-2656, wherein a corruption during a major delta
+// compaction would lead to a failure in debug mode.
+TEST_F(TestMajorDeltaCompaction, TestKudu2656) {
+  constexpr int kNumRows = 100;
+  NO_FATALS(WriteTestTablet(kNumRows));
+  ASSERT_OK(tablet()->Flush());
+  vector<shared_ptr<RowSet>> all_rowsets;
+  tablet()->GetRowSetsForTests(&all_rowsets);
+  ASSERT_FALSE(all_rowsets.empty());
+  shared_ptr<RowSet> rs = all_rowsets.front();
+  // Create some on-disk deltas.
+  NO_FATALS(UpdateRows(kNumRows, /*even=*/false));
+  ASSERT_OK(tablet()->FlushBiggestDMS());
+
+  // Major compact some columns.
+  vector<ColumnId> col_ids = { schema_.column_id(1),
+                               schema_.column_id(3),
+                               schema_.column_id(4) };
+
+  // Injecting a failure should result in an error, not a crash.
+  FLAGS_cfile_inject_corruption = 1.0;
+  fs::IOContext io_context({ "test-tablet" });
+  Status s = tablet()->DoMajorDeltaCompaction(col_ids, rs, &io_context);
+  ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+}
+
 // Tests a major delta compaction run.
 // Verifies that the output rowset accurately reflects the mutations, but keeps the
 // unchanged columns intact.
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 0d36f4c..40b728f 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1083,10 +1083,11 @@ void Tablet::AtomicSwapRowSetsUnlocked(const RowSetVector &to_remove,
 }
 
 Status Tablet::DoMajorDeltaCompaction(const vector<ColumnId>& col_ids,
-                                      const shared_ptr<RowSet>& input_rs) {
+                                      const shared_ptr<RowSet>& input_rs,
+                                      const IOContext* io_context) {
   RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
   Status s = down_cast<DiskRowSet*>(input_rs.get())
-      ->MajorCompactDeltaStoresWithColumnIds(col_ids, nullptr, GetHistoryGcOpts());
+      ->MajorCompactDeltaStoresWithColumnIds(col_ids, io_context, GetHistoryGcOpts());
   return s;
 }
 
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 4e3fadc..9853191 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -385,7 +385,8 @@ class Tablet {
   //
   // Only used in tests.
   Status DoMajorDeltaCompaction(const std::vector<ColumnId>& col_ids,
-                                const std::shared_ptr<RowSet>& input_rs);
+                                const std::shared_ptr<RowSet>& input_rs,
+                                const fs::IOContext* io_context = nullptr);
 
   // Calculates the ancient history mark and returns true iff tablet history GC
   // is enabled, which requires the use of a HybridClock.