You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/11/22 06:44:27 UTC

[1/3] kudu git commit: tablet: make various update paths atomic

Repository: kudu
Updated Branches:
  refs/heads/master fe89fa66f -> 8a81f4ff1


tablet: make various update paths atomic

A few codepaths in the tablet subsystem aren't atomic, i.e. if they fail
mid-method (e.g. due to a file error), they leave some in-memory
structures updated and others untouched. This has been safe because we
CHECKed to ensure their success. In preparation for _not_ crashing in
these methods, this patch refactors some of these functions to be
atomic, and notes others that still have the possibility of failing in
such a state (these calls still trigger a CHECK failure).

There are no functional changes in this patch.

Change-Id: I0241cd4206ce77ef1fb334458b091bc2092f4141
Reviewed-on: http://gerrit.cloudera.org:8080/8441
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fb0eced0
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fb0eced0
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fb0eced0

Branch: refs/heads/master
Commit: fb0eced0cd291c1dd4811dddab9c8ace9787e958
Parents: fe89fa6
Author: Andrew Wong <aw...@cloudera.com>
Authored: Thu Nov 16 17:58:50 2017 -0800
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Wed Nov 22 06:38:04 2017 +0000

----------------------------------------------------------------------
 src/kudu/tablet/delta_compaction.cc | 44 +++++++++------
 src/kudu/tablet/delta_compaction.h  |  2 +-
 src/kudu/tablet/delta_tracker.cc    | 92 +++++++++++++++-----------------
 src/kudu/tablet/delta_tracker.h     | 47 +++++++++-------
 src/kudu/tablet/diskrowset.cc       | 50 +++++++++++------
 src/kudu/tablet/diskrowset.h        |  2 +-
 src/kudu/tablet/metadata-test.cc    | 64 +++++++++++-----------
 src/kudu/tablet/rowset_metadata.cc  | 69 +++++++++++++-----------
 src/kudu/tablet/rowset_metadata.h   | 15 +++++-
 9 files changed, 220 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fb0eced0/src/kudu/tablet/delta_compaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index 4c4a09c..969cf89 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -306,7 +306,7 @@ Status MajorDeltaCompaction::Compact() {
   return Status::OK();
 }
 
-Status MajorDeltaCompaction::CreateMetadataUpdate(
+void MajorDeltaCompaction::CreateMetadataUpdate(
     RowSetMetadataUpdate* update) {
   CHECK(update);
   CHECK_EQ(state_, kFinished);
@@ -355,8 +355,6 @@ Status MajorDeltaCompaction::CreateMetadataUpdate(
       }
     }
   }
-
-  return Status::OK();
 }
 
 // We're called under diskrowset's component_lock_ and delta_tracker's compact_flush_lock_
@@ -364,24 +362,40 @@ Status MajorDeltaCompaction::CreateMetadataUpdate(
 // operation.
 Status MajorDeltaCompaction::UpdateDeltaTracker(DeltaTracker* tracker) {
   CHECK_EQ(state_, kFinished);
-  vector<BlockId> new_delta_blocks;
-  // We created a new delta block only if we had deltas to write back. We still need to update
-  // the tracker so that it removes the included_stores_.
+
+  // 1. Get all the necessary I/O out of the way. It's OK to fail here
+  //    because we haven't updated any in-memory state.
+  //
+  // TODO(awong): pull the OpenDeltaReaders() calls out of the critical path of
+  // diskrowset's component_lock_. They touch disk and may block other
+  // diskrowset operations.
+
+  // Create blocks for the new redo deltas.
+  vector<BlockId> new_redo_blocks;
   if (redo_delta_mutations_written_ > 0) {
-    new_delta_blocks.push_back(new_redo_delta_block_);
+    new_redo_blocks.push_back(new_redo_delta_block_);
   }
-  RETURN_NOT_OK(tracker->AtomicUpdateStores(included_stores_,
-                                            new_delta_blocks,
-                                            REDO));
+  SharedDeltaStoreVector new_redo_stores;
+  RETURN_NOT_OK(tracker->OpenDeltaReaders(new_redo_blocks, &new_redo_stores, REDO));
 
-  // We only call AtomicUpdateStores() if we wrote UNDOs, we're not removing stores so we don't
-  // need to call it otherwise.
+  // Create blocks for the new undo deltas.
+  SharedDeltaStoreVector new_undo_stores;
   if (undo_delta_mutations_written_ > 0) {
     vector<BlockId> new_undo_blocks;
     new_undo_blocks.push_back(new_undo_delta_block_);
-    return tracker->AtomicUpdateStores({},
-                                       new_undo_blocks,
-                                       UNDO);
+    RETURN_NOT_OK(tracker->OpenDeltaReaders(new_undo_blocks, &new_undo_stores, UNDO));
+  }
+
+  // 2. Only now that we cannot fail do we update the in-memory state.
+
+  // Even if we didn't create any new redo blocks, we still need to update the
+  // tracker so it removes the included_stores_.
+  tracker->AtomicUpdateStores(included_stores_, new_redo_stores, REDO);
+
+  // We only call AtomicUpdateStores() for UNDOs if we wrote UNDOs. We're not
+  // removing stores so we don't need to call it otherwise.
+  if (!new_undo_stores.empty()) {
+    tracker->AtomicUpdateStores({}, new_undo_stores, UNDO);
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb0eced0/src/kudu/tablet/delta_compaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_compaction.h b/src/kudu/tablet/delta_compaction.h
index 4397804..6c99d4b 100644
--- a/src/kudu/tablet/delta_compaction.h
+++ b/src/kudu/tablet/delta_compaction.h
@@ -71,7 +71,7 @@ class MajorDeltaCompaction {
   // 1) swaps out the old columns for the new ones
   // 2) removes the compacted deltas
   // 3) adds the new REDO delta which contains any uncompacted deltas
-  Status CreateMetadataUpdate(RowSetMetadataUpdate* update);
+  void CreateMetadataUpdate(RowSetMetadataUpdate* update);
 
   // Apply the changes to the given delta tracker.
   Status UpdateDeltaTracker(DeltaTracker* tracker);

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb0eced0/src/kudu/tablet/delta_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index a7734bf..86cea57 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -199,9 +199,9 @@ string JoinDeltaStoreStrings(const SharedDeltaStoreVector& stores) {
 
 } // anonymous namespace
 
-void DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
-                                      const std::shared_ptr<DeltaStore>& second,
-                                      DeltaType type) {
+Status DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
+                                        const std::shared_ptr<DeltaStore>& second,
+                                        DeltaType type) {
   shared_ptr<DeltaStore> first_copy = first;
   shared_ptr<DeltaStore> second_copy = second;
 
@@ -209,16 +209,16 @@ void DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
   // tests. We know it's a DeltaFileReader if it's not Initted().
   if (!first_copy->Initted()) {
     shared_ptr<DeltaFileReader> first_clone;
-    DCHECK_OK(down_cast<DeltaFileReader*>(first.get())->CloneForDebugging(
+    RETURN_NOT_OK(down_cast<DeltaFileReader*>(first.get())->CloneForDebugging(
         rowset_metadata_->fs_manager(), mem_trackers_.tablet_tracker, &first_clone));
-    DCHECK_OK(first_clone->Init());
+    RETURN_NOT_OK(first_clone->Init());
     first_copy = first_clone;
   }
   if (!second_copy->Initted()) {
     shared_ptr<DeltaFileReader> second_clone;
-    DCHECK_OK(down_cast<DeltaFileReader*>(second.get())->CloneForDebugging(
+    RETURN_NOT_OK(down_cast<DeltaFileReader*>(second.get())->CloneForDebugging(
         rowset_metadata_->fs_manager(), mem_trackers_.tablet_tracker, &second_clone));
-    DCHECK_OK(second_clone->Init());
+    RETURN_NOT_OK(second_clone->Init());
     second_copy = second_clone;
   }
 
@@ -236,21 +236,19 @@ void DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
           << second_copy->ToString() << "}]: type = " << type;
       break;
   }
+  return Status::OK();
 }
 
-void DeltaTracker::ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type) {
-  for (size_t i = 0; i < list.size() - 1; i++) {
-    ValidateDeltaOrder(list[i], list[i + 1], type);
+Status DeltaTracker::ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type) {
+  for (size_t i = 1; i < list.size(); i++) {
+    RETURN_NOT_OK(ValidateDeltaOrder(list[i - 1], list[i], type));
   }
+  return Status::OK();
 }
 
-Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
-                                        const vector<BlockId>& new_delta_blocks,
-                                        DeltaType type) {
-  SharedDeltaStoreVector new_stores;
-  RETURN_NOT_OK_PREPEND(OpenDeltaReaders(new_delta_blocks, &new_stores, type),
-                        "Unable to open delta blocks");
-
+void DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
+                                      const SharedDeltaStoreVector& new_stores,
+                                      DeltaType type) {
   std::lock_guard<rw_spinlock> lock(component_lock_);
   SharedDeltaStoreVector* stores_to_update =
       type == REDO ? &redo_delta_stores_ : &undo_delta_stores_;
@@ -274,21 +272,16 @@ Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_
   if (stores_to_replace.empty()) {
     // With nothing to remove, we always prepend to the front of the list.
     start_it = stores_to_update->begin();
-
   } else {
     // With something to remove, we do a range-replace.
-    start_it =
-        std::find(stores_to_update->begin(), stores_to_update->end(), stores_to_replace[0]);
-
+    start_it = std::find(stores_to_update->begin(), stores_to_update->end(), stores_to_replace[0]);
     auto end_it = start_it;
     for (const shared_ptr<DeltaStore>& ds : stores_to_replace) {
-      if (end_it == stores_to_update->end() || *end_it != ds) {
-        return Status::InvalidArgument(
-            strings::Substitute("Cannot find $0 deltastore sequence <$1> in <$2>",
-                                DeltaType_Name(type),
-                                JoinDeltaStoreStrings(stores_to_replace),
-                                JoinDeltaStoreStrings(*stores_to_update)));
-      }
+      CHECK(end_it != stores_to_update->end() && *end_it == ds) <<
+          Substitute("Cannot find $0 deltastore sequence <$1> in <$2>",
+                     DeltaType_Name(type),
+                     JoinDeltaStoreStrings(stores_to_replace),
+                     JoinDeltaStoreStrings(*stores_to_update));
       ++end_it;
     }
     // Remove the old stores.
@@ -298,12 +291,15 @@ Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_
 #ifndef NDEBUG
   // Perform validation checks to ensure callers do not violate our contract.
   if (!new_stores.empty()) {
-    // Make sure the new stores are already ordered.
-    ValidateDeltasOrdered(new_stores, type);
+    // Make sure the new stores are already ordered. Non-OK Statuses here don't
+    // indicate ordering errors, but rather, physical errors (e.g. disk
+    // failure). These don't indicate incorrectness and are thusly ignored.
+    WARN_NOT_OK(ValidateDeltasOrdered(new_stores, type), "Could not validate delta order");
     if (start_it != stores_to_update->end()) {
       // Sanity check that the last store we are adding would logically appear
       // before the first store that would follow it.
-      ValidateDeltaOrder(*new_stores.rbegin(), *start_it, type);
+      WARN_NOT_OK(ValidateDeltaOrder(*new_stores.rbegin(), *start_it, type),
+                  "Could not validate delta order");
     }
   }
 #endif // NDEBUG
@@ -313,7 +309,6 @@ Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_
 
   VLOG_WITH_PREFIX(1) << "New " << DeltaType_Name(type) << " stores: "
                       << JoinDeltaStoreStrings(*stores_to_update);
-  return Status::OK();
 }
 
 Status DeltaTracker::Compact() {
@@ -332,12 +327,17 @@ Status DeltaTracker::CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate&
   // We enforce that with this DCHECK.
   DCHECK(!to_remove.empty());
 
-  // Update the in-memory metadata.
-  RETURN_NOT_OK(rowset_metadata_->CommitUpdate(update));
+  SharedDeltaStoreVector new_stores;
+  RETURN_NOT_OK_PREPEND(OpenDeltaReaders(new_delta_blocks, &new_stores, type),
+                        "Unable to open delta blocks");
+
+  vector<BlockId> removed_blocks;
+  rowset_metadata_->CommitUpdate(update, &removed_blocks);
+  rowset_metadata_->AddOrphanedBlocks(removed_blocks);
   // Once we successfully commit to the rowset metadata, let's ensure we update
-  // the delta stores to maintain consistency between the two. We enforce this
-  // with a CHECK_OK here.
-  CHECK_OK(AtomicUpdateStores(to_remove, new_delta_blocks, type));
+  // the delta stores to maintain consistency between the two.
+  AtomicUpdateStores(to_remove, new_stores, type);
+
   if (flush_type == FLUSH_METADATA) {
     // Flushing the metadata is considered best-effort in this function.
     // No consistency problems will be visible if we don't successfully
@@ -714,16 +714,15 @@ Status DeltaTracker::Flush(MetadataFlushType flush_type) {
   LOG_WITH_PREFIX(INFO) << "Flushing " << count << " deltas from DMS " << old_dms->id() << "...";
 
   // Now, actually flush the contents of the old DMS.
+  // TODO(awong): failures here leaves a DeltaMemStore permanently in the store
+  // list. For now, handle this by CHECKing for success, but we're going to
+  // want a more concrete solution if, say, we want to handle arbitrary file
+  // errors.
+  //
   // TODO(todd): need another lock to prevent concurrent flushers
   // at some point.
   shared_ptr<DeltaFileReader> dfr;
-  Status s = FlushDMS(old_dms.get(), &dfr, flush_type);
-  CHECK(s.ok())
-    << "Failed to flush DMS: " << s.ToString()
-    << "\nTODO: need to figure out what to do with error handling "
-    << "if this fails -- we end up with a DeltaMemStore permanently "
-    << "in the store list. For now, abort.";
-
+  CHECK_OK(FlushDMS(old_dms.get(), &dfr, flush_type));
 
   // Now, re-take the lock and swap in the DeltaFileReader in place of
   // of the DeltaMemStore
@@ -732,14 +731,11 @@ Status DeltaTracker::Flush(MetadataFlushType flush_type) {
     size_t idx = redo_delta_stores_.size() - 1;
 
     CHECK_EQ(redo_delta_stores_[idx], old_dms)
-      << "Another thread modified the delta store list during flush";
+        << "Another thread modified the delta store list during flush";
     redo_delta_stores_[idx] = dfr;
   }
 
   return Status::OK();
-
-  // TODO: wherever we write stuff, we should write to a tmp path
-  // and rename to final path!
 }
 
 size_t DeltaTracker::DeltaMemStoreSize() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb0eced0/src/kudu/tablet/delta_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index 6a49fbd..bb9bcec 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -161,11 +161,12 @@ class DeltaTracker {
   // and if so, compact the stores.
   Status Compact();
 
-  // Persist the given delta updates to disk and then make them visible via the
-  // DeltaTracker. The 'compact_flush_lock_' should be acquired before calling
-  // this method. This method should only be used for compactions or ancient
-  // history data GC, not when adding mutations, since it makes the updated
-  // stores visible before attempting to flush the metadata to disk.
+  // Updates the in-memory list of delta stores and then persists the updated
+  // metadata. This should only be used for compactions or ancient history
+  // data GC, not when adding mutations, since it makes the updated stores
+  // visible before attempting to flush the metadata to disk.
+  //
+  // The 'compact_flush_lock_' should be acquired before calling this method.
   Status CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate& update,
                                         const SharedDeltaStoreVector& to_remove,
                                         const std::vector<BlockId>& new_delta_blocks,
@@ -192,23 +193,35 @@ class DeltaTracker {
   Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
                                  int64_t* blocks_deleted, int64_t* bytes_deleted);
 
-  // Validate that 'first' may precede 'second' in an ordered list of deltas,
-  // given a delta type of 'type'. This should only be run in DEBUG mode.
-  void ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
-                          const std::shared_ptr<DeltaStore>& second,
+  // Opens the input 'blocks' of type 'type' and returns the opened delta file
+  // readers in 'stores'.
+  Status OpenDeltaReaders(const std::vector<BlockId>& blocks,
+                          std::vector<std::shared_ptr<DeltaStore> >* stores,
                           DeltaType type);
 
-  // Validate the relative ordering of the deltas in the specified list. This
+  // Validates that 'first' may precede 'second' in an ordered list of deltas,
+  // given a delta type of 'type'. This should only be run in DEBUG mode.
+  //
+  // Crashes if there is an ordering violation, and returns an error if the
+  // validation could not be performed.
+  Status ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
+                            const std::shared_ptr<DeltaStore>& second,
+                            DeltaType type);
+
+  // Validates the relative ordering of the deltas in the specified list. This
   // should only be run in DEBUG mode.
-  void ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type);
+  //
+  // Crashes if there is an ordering violation, and returns an error if the
+  // validation could not be performed.
+  Status ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type);
 
-  // Replace the subsequence of stores that matches 'stores_to_replace' with
+  // Replaces the subsequence of stores that matches 'stores_to_replace' with
   // delta file readers corresponding to 'new_delta_blocks', which may be empty.
   // If 'stores_to_replace' is empty then the stores represented by
   // 'new_delta_blocks' are prepended to the relevant delta stores list.
-  Status AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
-                            const std::vector<BlockId>& new_delta_blocks,
-                            DeltaType type);
+  void AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
+                          const SharedDeltaStoreVector& new_stores,
+                          DeltaType type);
 
   // Return the number of rows encompassed by this DeltaTracker. Note that
   // this is _not_ the number of updated rows, but rather the number of rows
@@ -262,10 +275,6 @@ class DeltaTracker {
 
   Status DoOpen();
 
-  Status OpenDeltaReaders(const std::vector<BlockId>& blocks,
-                          std::vector<std::shared_ptr<DeltaStore> >* stores,
-                          DeltaType type);
-
   Status FlushDMS(DeltaMemStore* dms,
                   std::shared_ptr<DeltaFileReader>* dfr,
                   MetadataFlushType flush_type);

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb0eced0/src/kudu/tablet/diskrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index bc3c99a..dfa5801 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -45,6 +45,7 @@
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/tablet/deltafile.h"
+#include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/multi_column_writer.h"
 #include "kudu/tablet/mutation.h"
 #include "kudu/tablet/mvcc.h"
@@ -54,6 +55,7 @@
 #include "kudu/util/locks.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
@@ -525,10 +527,10 @@ Status DiskRowSet::Open() {
 
   rowid_t num_rows;
   RETURN_NOT_OK(base_data_->CountRows(&num_rows));
-    RETURN_NOT_OK(DeltaTracker::Open(rowset_metadata_, num_rows,
-                                     log_anchor_registry_,
-                                     mem_trackers_,
-                                     &delta_tracker_));
+  RETURN_NOT_OK(DeltaTracker::Open(rowset_metadata_, num_rows,
+                                   log_anchor_registry_,
+                                   mem_trackers_,
+                                   &delta_tracker_));
 
   open_ = true;
 
@@ -569,28 +571,42 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
 
   RETURN_NOT_OK(compaction->Compact());
 
-  // Update the metadata.
+  // Before updating anything, create a copy of the rowset metadata so we can
+  // revert changes in case of error.
+  RowSetDataPB original_pb;
+  rowset_metadata_->ToProtobuf(&original_pb);
+  auto revert_metadata_update = MakeScopedCleanup([&] {
+    LOG_WITH_PREFIX(WARNING) << "Error during major delta compaction! Rolling back rowset metadata";
+    rowset_metadata_->LoadFromPB(original_pb);
+  });
+
+  // Prepare the changes to the metadata.
   RowSetMetadataUpdate update;
-  RETURN_NOT_OK(compaction->CreateMetadataUpdate(&update));
-  RETURN_NOT_OK(rowset_metadata_->CommitUpdate(update));
+  compaction->CreateMetadataUpdate(&update);
+  vector<BlockId> removed_blocks;
+  rowset_metadata_->CommitUpdate(update, &removed_blocks);
 
-  // Since we've already updated the metadata in memory, now we update the
-  // delta tracker's stores. Those stores should match the blocks in the
-  // metadata so, since we've already updated the metadata, we use CHECK_OK
-  // here.
+  // Now that the metadata has been updated, open a new cfile set with the
+  // appropriate blocks to match the update.
   shared_ptr<CFileSet> new_base;
   RETURN_NOT_OK(CFileSet::Open(rowset_metadata_,
-                               mem_trackers_.tablet_tracker,
-                               &new_base));
+                          mem_trackers_.tablet_tracker,
+                          &new_base));
   {
+    // Update the delta tracker and the base data with the changes.
     std::lock_guard<rw_spinlock> lock(component_lock_);
-    CHECK_OK(compaction->UpdateDeltaTracker(delta_tracker_.get()));
+    RETURN_NOT_OK(compaction->UpdateDeltaTracker(delta_tracker_.get()));
     base_data_.swap(new_base);
   }
 
-  // We don't CHECK_OK on Flush here because if we don't successfully flush we
-  // don't have consistency problems in the case of major delta compaction --
-  // we are not adding additional mutations that weren't already present.
+  // Now that we've successfully compacted, add the removed blocks to the
+  // orphaned blocks list and cancel cleanup.
+  rowset_metadata_->AddOrphanedBlocks(removed_blocks);
+  revert_metadata_update.cancel();
+
+  // Even if we don't successfully flush we don't have consistency problems in
+  // the case of major delta compaction -- we are not adding additional
+  // mutations that werent already present.
   return rowset_metadata_->Flush();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb0eced0/src/kudu/tablet/diskrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 471d7e7..eb64dd5 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -141,7 +141,7 @@ class DiskRowSetWriter {
   // (the ad-hoc writer for composite keys, otherwise the key column writer)
   cfile::CFileWriter *key_index_writer();
 
-  RowSetMetadata *rowset_metadata_;
+  RowSetMetadata* rowset_metadata_;
   const Schema* const schema_;
 
   BloomFilterSizing bloom_sizing_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb0eced0/src/kudu/tablet/metadata-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/metadata-test.cc b/src/kudu/tablet/metadata-test.cc
index c213540..ec77b00 100644
--- a/src/kudu/tablet/metadata-test.cc
+++ b/src/kudu/tablet/metadata-test.cc
@@ -28,7 +28,6 @@
 #include "kudu/tablet/rowset_metadata.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/util/status.h"
-#include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
 using std::unique_ptr;
@@ -64,11 +63,14 @@ TEST_F(MetadataTest, RSMD_TestReplaceDeltas_1) {
   to_replace.emplace_back(2);
   to_replace.emplace_back(3);
 
-  ASSERT_OK(meta_->CommitUpdate(
-              RowSetMetadataUpdate()
-              .ReplaceRedoDeltaBlocks(to_replace, { BlockId(123) })));
+  vector<BlockId> removed;
+  meta_->CommitUpdate(
+      RowSetMetadataUpdate()
+      .ReplaceRedoDeltaBlocks(to_replace, { BlockId(123) }), &removed);
   ASSERT_EQ(vector<BlockId>({ BlockId(1), BlockId(123), BlockId(4) }),
             meta_->redo_delta_blocks());
+  ASSERT_EQ(vector<BlockId>({ BlockId(2), BlockId(3) }),
+            removed);
 }
 
 // Swap out some deltas from the beginning of the list
@@ -77,11 +79,14 @@ TEST_F(MetadataTest, RSMD_TestReplaceDeltas_2) {
   to_replace.emplace_back(1);
   to_replace.emplace_back(2);
 
-  ASSERT_OK(meta_->CommitUpdate(
-              RowSetMetadataUpdate()
-              .ReplaceRedoDeltaBlocks(to_replace, { BlockId(123) })));
+  vector<BlockId> removed;
+  meta_->CommitUpdate(
+      RowSetMetadataUpdate()
+      .ReplaceRedoDeltaBlocks(to_replace, { BlockId(123) }), &removed);
   ASSERT_EQ(vector<BlockId>({ BlockId(123), BlockId(3), BlockId(4) }),
             meta_->redo_delta_blocks());
+  ASSERT_EQ(vector<BlockId>({ BlockId(1), BlockId(2) }),
+            removed);
 }
 
 // Swap out some deltas from the end of the list
@@ -90,11 +95,14 @@ TEST_F(MetadataTest, RSMD_TestReplaceDeltas_3) {
   to_replace.emplace_back(3);
   to_replace.emplace_back(4);
 
-  ASSERT_OK(meta_->CommitUpdate(
-              RowSetMetadataUpdate()
-              .ReplaceRedoDeltaBlocks(to_replace, { BlockId(123) })));
+  vector<BlockId> removed;
+  meta_->CommitUpdate(
+      RowSetMetadataUpdate()
+      .ReplaceRedoDeltaBlocks(to_replace, { BlockId(123) }), &removed);
   ASSERT_EQ(vector<BlockId>({ BlockId(1), BlockId(2), BlockId(123) }),
             meta_->redo_delta_blocks());
+  ASSERT_EQ(vector<BlockId>({ BlockId(3), BlockId(4) }),
+            removed);
 }
 
 // Swap out a non-contiguous list, check error.
@@ -103,16 +111,14 @@ TEST_F(MetadataTest, RSMD_TestReplaceDeltas_Bad_NonContiguous) {
   to_replace.emplace_back(2);
   to_replace.emplace_back(4);
 
-  Status s = meta_->CommitUpdate(
-    RowSetMetadataUpdate()
-    .ReplaceRedoDeltaBlocks(to_replace, { BlockId(123) }));
-  EXPECT_EQ(Substitute("Invalid argument: Cannot find subsequence <$0> in <$1>",
-                       BlockId::JoinStrings(to_replace),
-                       BlockId::JoinStrings(all_blocks_)),
-            s.ToString());
-
-  // Should be unchanged
-  EXPECT_EQ(all_blocks_, meta_->redo_delta_blocks());
+  EXPECT_DEATH({
+    vector<BlockId> removed;
+    meta_->CommitUpdate(
+        RowSetMetadataUpdate().ReplaceRedoDeltaBlocks(to_replace, { BlockId(123) }),
+        &removed);
+  }, Substitute(".*Cannot find subsequence <$0> in <$1>.*",
+                BlockId::JoinStrings(to_replace),
+                BlockId::JoinStrings(all_blocks_)));
 }
 
 // Swap out a list which contains an invalid element, check error.
@@ -120,16 +126,14 @@ TEST_F(MetadataTest, RSMD_TestReplaceDeltas_Bad_DoesntExist) {
   vector<BlockId> to_replace;
   to_replace.emplace_back(555);
 
-  Status s = meta_->CommitUpdate(
-    RowSetMetadataUpdate()
-    .ReplaceRedoDeltaBlocks(to_replace, { BlockId(123) }));
-  EXPECT_EQ(Substitute("Invalid argument: Cannot find subsequence <$0> in <$1>",
-                       BlockId::JoinStrings(to_replace),
-                       BlockId::JoinStrings(all_blocks_)),
-            s.ToString());
-
-  // Should be unchanged
-  EXPECT_EQ(all_blocks_, meta_->redo_delta_blocks());
+  EXPECT_DEATH({
+    vector<BlockId> removed;
+    meta_->CommitUpdate(
+        RowSetMetadataUpdate().ReplaceRedoDeltaBlocks(to_replace, { BlockId(123) }),
+        &removed);
+  },Substitute(".*Cannot find subsequence <$0> in <$1>",
+               BlockId::JoinStrings(to_replace),
+               BlockId::JoinStrings(all_blocks_)));
 }
 
 } // namespace tablet

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb0eced0/src/kudu/tablet/rowset_metadata.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/rowset_metadata.cc b/src/kudu/tablet/rowset_metadata.cc
index f459310..87fa63d 100644
--- a/src/kudu/tablet/rowset_metadata.cc
+++ b/src/kudu/tablet/rowset_metadata.cc
@@ -56,6 +56,10 @@ Status RowSetMetadata::CreateNew(TabletMetadata* tablet_metadata,
   return Status::OK();
 }
 
+void RowSetMetadata::AddOrphanedBlocks(const vector<BlockId>& blocks) {
+  tablet_metadata_->AddOrphanedBlocks(blocks);
+}
+
 Status RowSetMetadata::Flush() {
   return tablet_metadata_->Flush();
 }
@@ -63,38 +67,48 @@ Status RowSetMetadata::Flush() {
 Status RowSetMetadata::InitFromPB(const RowSetDataPB& pb) {
   CHECK(!initted_);
 
+  LoadFromPB(pb);
+
+  initted_ = true;
+  return Status::OK();
+}
+
+void RowSetMetadata::LoadFromPB(const RowSetDataPB& pb) {
+  std::lock_guard<LockType> l(lock_);
   id_ = pb.id();
 
-  // Load Bloom File
+  // Load Bloom File.
+  bloom_block_ = BlockId();
   if (pb.has_bloom_block()) {
     bloom_block_ = BlockId::FromPB(pb.bloom_block());
   }
 
-  // Load AdHoc Index File
+  // Load AdHoc Index File.
+  adhoc_index_block_ = BlockId();
   if (pb.has_adhoc_index_block()) {
     adhoc_index_block_ = BlockId::FromPB(pb.adhoc_index_block());
   }
 
-  // Load Column Files
+  // Load Column Files.
+  blocks_by_col_id_.clear();
   for (const ColumnDataPB& col_pb : pb.columns()) {
     ColumnId col_id = ColumnId(col_pb.column_id());
     blocks_by_col_id_[col_id] = BlockId::FromPB(col_pb.block());
   }
 
-  // Load redo delta files
+  // Load redo delta files.
+  redo_delta_blocks_.clear();
   for (const DeltaDataPB& redo_delta_pb : pb.redo_deltas()) {
     redo_delta_blocks_.push_back(BlockId::FromPB(redo_delta_pb.block()));
   }
 
   last_durable_redo_dms_id_ = pb.last_durable_dms_id();
 
-  // Load undo delta files
+  // Load undo delta files.
+  undo_delta_blocks_.clear();
   for (const DeltaDataPB& undo_delta_pb : pb.undo_deltas()) {
     undo_delta_blocks_.push_back(BlockId::FromPB(undo_delta_pb.block()));
   }
-
-  initted_ = true;
-  return Status::OK();
 }
 
 void RowSetMetadata::ToProtobuf(RowSetDataPB *pb) {
@@ -161,13 +175,14 @@ Status RowSetMetadata::CommitUndoDeltaDataBlock(const BlockId& block_id) {
   return Status::OK();
 }
 
-Status RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update) {
-  vector<BlockId> removed;
+void RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update,
+                                  vector<BlockId>* removed) {
+  removed->clear();
   {
     std::lock_guard<LockType> l(lock_);
 
-    for (const RowSetMetadataUpdate::ReplaceDeltaBlocks rep :
-                  update.replace_redo_blocks_) {
+    // Find the exact sequence of blocks to remove.
+    for (const auto& rep : update.replace_redo_blocks_) {
       CHECK(!rep.to_remove.empty());
 
       auto start_it = std::find(redo_delta_blocks_.begin(),
@@ -175,16 +190,14 @@ Status RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update) {
 
       auto end_it = start_it;
       for (const BlockId& b : rep.to_remove) {
-        if (end_it == redo_delta_blocks_.end() || *end_it != b) {
-          return Status::InvalidArgument(
-              Substitute("Cannot find subsequence <$0> in <$1>",
-                         BlockId::JoinStrings(rep.to_remove),
-                         BlockId::JoinStrings(redo_delta_blocks_)));
-        }
+        CHECK(end_it != redo_delta_blocks_.end() && *end_it == b) <<
+            Substitute("Cannot find subsequence <$0> in <$1>",
+                       BlockId::JoinStrings(rep.to_remove),
+                       BlockId::JoinStrings(redo_delta_blocks_));
         ++end_it;
       }
 
-      removed.insert(removed.end(), start_it, end_it);
+      removed->insert(removed->end(), start_it, end_it);
       redo_delta_blocks_.erase(start_it, end_it);
       redo_delta_blocks_.insert(start_it, rep.to_add.begin(), rep.to_add.end());
     }
@@ -197,14 +210,12 @@ Status RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update) {
     // Remove undo blocks.
     BlockIdSet undos_to_remove(update.remove_undo_blocks_.begin(),
                                update.remove_undo_blocks_.end());
-    int64_t num_removed = 0;
     auto iter = undo_delta_blocks_.begin();
     while (iter != undo_delta_blocks_.end()) {
       if (ContainsKey(undos_to_remove, *iter)) {
-        removed.push_back(*iter);
+        removed->push_back(*iter);
         undos_to_remove.erase(*iter);
         iter = undo_delta_blocks_.erase(iter);
-        num_removed++;
       } else {
         ++iter;
       }
@@ -212,7 +223,7 @@ Status RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update) {
     CHECK(undos_to_remove.empty())
         << "Tablet " << tablet_metadata_->tablet_id() << " RowSet " << id_ << ": "
         << "Attempted to remove an undo delta block from the RowSetMetadata that is not present. "
-        << "Removed: { " << removed << " }; "
+        << "Removed: { " << *removed << " }; "
         << "Failed to remove: { "
         << vector<BlockId>(undos_to_remove.begin(), undos_to_remove.end())
         << " }";
@@ -228,24 +239,18 @@ Status RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update) {
       // block there to replace.
       BlockId old_block_id;
       if (UpdateReturnCopy(&blocks_by_col_id_, e.first, e.second, &old_block_id)) {
-        removed.push_back(old_block_id);
+        removed->push_back(old_block_id);
       }
     }
 
-    for (ColumnId col_id : update.col_ids_to_remove_) {
+    for (const ColumnId& col_id : update.col_ids_to_remove_) {
       BlockId old = FindOrDie(blocks_by_col_id_, col_id);
       CHECK_EQ(1, blocks_by_col_id_.erase(col_id));
-      removed.push_back(old);
+      removed->push_back(old);
     }
   }
 
   blocks_by_col_id_.shrink_to_fit();
-
-  // Should only be NULL in tests.
-  if (tablet_metadata()) {
-    tablet_metadata()->AddOrphanedBlocks(removed);
-  }
-  return Status::OK();
 }
 
 vector<BlockId> RowSetMetadata::GetAllBlocks() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/fb0eced0/src/kudu/tablet/rowset_metadata.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/rowset_metadata.h b/src/kudu/tablet/rowset_metadata.h
index c546cb5..fa98897 100644
--- a/src/kudu/tablet/rowset_metadata.h
+++ b/src/kudu/tablet/rowset_metadata.h
@@ -82,8 +82,17 @@ class RowSetMetadata {
                      const RowSetDataPB& pb,
                      std::unique_ptr<RowSetMetadata>* metadata);
 
+  // Resets the in-memory state to mirror 'pb'.
+  //
+  // Should only be used to initialize a RowSetMetadata instance or to
+  // immediately roll-back an updated RowSetMetadata instance following an
+  // error after calling CommitUpdate().
+  void LoadFromPB(const RowSetDataPB& pb);
+
   Status Flush();
 
+  void AddOrphanedBlocks(const std::vector<BlockId>& blocks);
+
   const std::string ToString() const;
 
   int64_t id() const { return id_; }
@@ -173,8 +182,10 @@ class RowSetMetadata {
 
   // Atomically commit a set of changes to this object.
   //
-  // On success, calls TabletMetadata::AddOrphanedBlocks() on the removed blocks.
-  Status CommitUpdate(const RowSetMetadataUpdate& update);
+  // Returns the blocks removed from the rowset metadata during the update.
+  // These blocks must be added to the TabletMetadata's orphaned blocks list.
+  void CommitUpdate(const RowSetMetadataUpdate& update,
+                    std::vector<BlockId>* removed);
 
   void ToProtobuf(RowSetDataPB *pb);
 


[3/3] kudu git commit: handle disk failures during tablet copies

Posted by to...@apache.org.
handle disk failures during tablet copies

There are two components in a tablet copy: the copy client (that
receives data) and the copy session source (that sends data).

Coarse-grain handling of disk failures during tablet copies is done for
the tablet copy client as:
- Before starting a copy client, if no disks are available to place the
  tablet, simply return (instead of failing a CHECK).
- Before downloading each WAL segments or block, check that the tablet
  is in a healthy group.
And for the tablet copy session as:
- Before sending a block or log segment, check if the tablet has an
  error.

Upon returning an error, the tablet copy client will shutdown the
replica, leaving it in a failed state.

A test is added to ensure that both copy clients and that source
sessions with failed disks will return errors to the copying client.

Change-Id: Ic18d93c218ea13f3086f420a4847cb5e29a47bc7
Reviewed-on: http://gerrit.cloudera.org:8080/7654
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8a81f4ff
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8a81f4ff
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8a81f4ff

Branch: refs/heads/master
Commit: 8a81f4ff18ee30951a25bab247fa5a3c20058e49
Parents: ae0143c
Author: Andrew Wong <aw...@cloudera.com>
Authored: Wed Oct 25 14:49:57 2017 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Wed Nov 22 06:39:38 2017 +0000

----------------------------------------------------------------------
 src/kudu/tserver/tablet_copy-test-base.h       |  5 ++-
 src/kudu/tserver/tablet_copy_client-test.cc    | 47 ++++++++++++++++++---
 src/kudu/tserver/tablet_copy_client.cc         | 14 +++++-
 src/kudu/tserver/tablet_copy_client.h          |  4 ++
 src/kudu/tserver/tablet_copy_service-test.cc   | 40 ++++++++++++++++++
 src/kudu/tserver/tablet_copy_source_session.cc | 17 ++++++++
 src/kudu/tserver/tablet_copy_source_session.h  |  4 ++
 7 files changed, 124 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/8a81f4ff/src/kudu/tserver/tablet_copy-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy-test-base.h b/src/kudu/tserver/tablet_copy-test-base.h
index 8da5f37..638435d 100644
--- a/src/kudu/tserver/tablet_copy-test-base.h
+++ b/src/kudu/tserver/tablet_copy-test-base.h
@@ -43,7 +43,10 @@ class TabletCopyTest : public TabletServerTestBase {
  public:
   virtual void SetUp() OVERRIDE {
     NO_FATALS(TabletServerTestBase::SetUp());
-    NO_FATALS(StartTabletServer(/* num_data_dirs */ 1));
+    // Create a tablet server with multiple data dirs. In most cases, this is
+    // unimportant, but in some cases can be helpful to test multi-disk
+    // behavior and disk failures.
+    NO_FATALS(StartTabletServer(/* num_data_dirs */ 3));
     // Prevent logs from being deleted out from under us until / unless we want
     // to test that we are anchoring correctly. Since GenerateTestData() does a
     // Flush(), Log GC is allowed to eat the logs before we get around to

http://git-wip-us.apache.org/repos/asf/kudu/blob/8a81f4ff/src/kudu/tserver/tablet_copy_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
index ab99a21..277d8ff 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -20,7 +20,9 @@
 #include <limits>
 #include <memory>
 #include <ostream>
+#include <stdlib.h>
 #include <string>
+#include <thread>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
@@ -38,6 +40,7 @@
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
+#include "kudu/fs/data_dirs.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/gscoped_ptr.h"
@@ -64,6 +67,7 @@
 
 using std::shared_ptr;
 using std::string;
+using std::thread;
 using std::vector;
 
 DECLARE_string(block_manager);
@@ -77,6 +81,7 @@ using consensus::ConsensusMetadataManager;
 using consensus::ConsensusStatePB;
 using consensus::GetRaftConfigLeader;
 using consensus::RaftPeerPB;
+using fs::DataDirManager;
 using std::tuple;
 using std::unique_ptr;
 using tablet::TabletMetadata;
@@ -86,10 +91,14 @@ class TabletCopyClientTest : public TabletCopyTest {
   virtual void SetUp() OVERRIDE {
     NO_FATALS(TabletCopyTest::SetUp());
 
-    const string kTestDir = GetTestPath("client_tablet");
+    // To be a bit more flexible in testing, create a FS layout with multiple disks.
+    const string kTestWalDir = GetTestPath("client_tablet_wal");
+    const string kTestDataDirPrefix = GetTestPath("client_tablet_data");
     FsManagerOpts opts;
-    opts.wal_root = kTestDir;
-    opts.data_roots = { kTestDir };
+    opts.wal_root = kTestWalDir;
+    opts.data_roots.emplace_back(kTestDataDirPrefix + "A");
+    opts.data_roots.emplace_back(kTestDataDirPrefix + "B");
+
     metric_entity_ = METRIC_ENTITY_server.Instantiate(&metric_registry_, "test");
     opts.metric_entity = metric_entity_;
     fs_manager_.reset(new FsManager(Env::Default(), opts));
@@ -251,10 +260,10 @@ TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
 
   // Verify the disk synchronization count.
   if (FLAGS_block_manager == "log") {
-    ASSERT_EQ(3, down_cast<Counter*>(
+    ASSERT_EQ(6, down_cast<Counter*>(
         metric_entity_->FindOrNull(METRIC_block_manager_total_disk_sync).get())->value());
   } else {
-    ASSERT_EQ(14, down_cast<Counter*>(
+    ASSERT_EQ(18, down_cast<Counter*>(
         metric_entity_->FindOrNull(METRIC_block_manager_total_disk_sync).get())->value());
   }
 
@@ -271,6 +280,34 @@ TEST_F(TabletCopyClientTest, TestDownloadAllBlocks) {
   }
 }
 
+// Test that failing a disk outside fo the tablet copy client will eventually
+// stop the copy client and cause it to fail.
+TEST_F(TabletCopyClientTest, TestFailedDiskStopsClient) {
+  DataDirManager* dd_manager = fs_manager_->dd_manager();
+
+  // Repeatedly fetch files for the client.
+  Status s;
+  auto copy_thread = thread([&] {
+    while (s.ok()) {
+      s = client_->FetchAll(nullptr);
+    }
+  });
+
+  // In a separate thread, mark one of the directories as failed.
+  while (true) {
+    if (rand() % 10 == 0) {
+      dd_manager->MarkDataDirFailed(0, "injected failure in non-client thread");
+      LOG(INFO) << "INJECTING FAILURE";
+      break;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+
+  // The copy thread should stop and the copy client should return an error.
+  copy_thread.join();
+  ASSERT_TRUE(s.IsIOError());
+}
+
 enum DownloadBlocks {
   kDownloadBlocks,    // Fetch blocks from remote.
   kNoDownloadBlocks,  // Do not fetch blocks from remote.

http://git-wip-us.apache.org/repos/asf/kudu/blob/8a81f4ff/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index 13fe64e..4e16e7d 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -325,7 +325,8 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr,
                                           tablet::TABLET_DATA_COPYING,
                                           /*last_logged_opid=*/ boost::none),
         "Could not replace superblock with COPYING data state");
-    CHECK_OK(fs_manager_->dd_manager()->CreateDataDirGroup(tablet_id_));
+    RETURN_NOT_OK_PREPEND(fs_manager_->dd_manager()->CreateDataDirGroup(tablet_id_),
+        "Could not create a new directory group for tablet copy");
   } else {
     // HACK: Set the initial tombstoned last-logged OpId to 1.0 when copying a
     // replica for the first time, so that if the tablet copy fails, the
@@ -586,6 +587,8 @@ Status TabletCopyClient::DownloadBlocks() {
 
 Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) {
   VLOG_WITH_PREFIX(1) << "Downloading WAL segment with seqno " << wal_segment_seqno;
+  RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(), "Not downloading WAL for replica");
+
   DataIdPB data_id;
   data_id.set_type(DataIdPB::LOG_SEGMENT);
   data_id.set_wal_segment_seqno(wal_segment_seqno);
@@ -648,6 +651,7 @@ Status TabletCopyClient::DownloadAndRewriteBlock(const BlockIdPB& src_block_id,
 Status TabletCopyClient::DownloadBlock(const BlockId& old_block_id,
                                        BlockId* new_block_id) {
   VLOG_WITH_PREFIX(1) << "Downloading block with block_id " << old_block_id.ToString();
+  RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(), "Not downloading block for replica");
 
   unique_ptr<WritableBlock> block;
   RETURN_NOT_OK_PREPEND(fs_manager_->CreateNewBlock(CreateBlockOptions({ tablet_id_ }), &block),
@@ -740,6 +744,14 @@ string TabletCopyClient::LogPrefix() {
                     tablet_id_, fs_manager_->uuid());
 }
 
+Status TabletCopyClient::CheckHealthyDirGroup() const {
+  if (fs_manager_->dd_manager()->IsTabletInFailedDir(tablet_id_)) {
+    return Status::IOError(
+        Substitute("Tablet $0 is in a failed directory", tablet_id_));
+  }
+  return Status::OK();
+}
+
 template<typename F>
 Status TabletCopyClient::SendRpcWithRetry(rpc::RpcController* controller, F f) {
   MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(session_idle_timeout_millis_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/8a81f4ff/src/kudu/tserver/tablet_copy_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h
index f44e5c2..018c6db 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -145,6 +145,10 @@ class TabletCopyClient {
 
   static Status UnwindRemoteError(const Status& status, const rpc::RpcController& controller);
 
+  // Returns an error if any directories in the tablet's directory group are
+  // unhealthy.
+  Status CheckHealthyDirGroup() const;
+
   // Set a new status message on the TabletReplica.
   // The string "TabletCopy: " will be prepended to each message.
   void SetStatusMessage(const std::string& message);

http://git-wip-us.apache.org/repos/asf/kudu/blob/8a81f4ff/src/kudu/tserver/tablet_copy_service-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_service-test.cc b/src/kudu/tserver/tablet_copy_service-test.cc
index 7dd409f..8707f9e 100644
--- a/src/kudu/tserver/tablet_copy_service-test.cc
+++ b/src/kudu/tserver/tablet_copy_service-test.cc
@@ -20,6 +20,7 @@
 #include <cstdint>
 #include <limits>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <thread>
 #include <vector>
@@ -34,7 +35,9 @@
 #include "kudu/consensus/log_reader.h"
 #include "kudu/consensus/log_util.h"
 #include "kudu/fs/block_id.h"
+#include "kudu/fs/data_dirs.h"
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
@@ -55,6 +58,8 @@
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 
+DECLARE_bool(crash_on_eio);
+DECLARE_double(env_inject_eio);
 DECLARE_uint64(tablet_copy_idle_timeout_sec);
 DECLARE_uint64(tablet_copy_timeout_poll_period_ms);
 
@@ -509,5 +514,40 @@ TEST_F(TabletCopyServiceTest, TestSessionTimeout) {
   ASSERT_FALSE(resp.session_is_active()) << "Tablet Copy session did not time out!";
 }
 
+// Test that the tablet copy session will terminate on disk failures.
+TEST_F(TabletCopyServiceTest, TestDiskFailureDuringSession) {
+  string session_id;
+  tablet::TabletSuperBlockPB superblock;
+  ASSERT_OK(DoBeginValidTabletCopySession(&session_id, &superblock));
+
+  // Get a block id locally that we'll copy.
+  BlockId block_id = FirstColumnBlockId(superblock);
+  Slice local_data;
+  faststring scratch;
+  ASSERT_OK(ReadLocalBlockFile(mini_server_->server()->fs_manager(), block_id,
+                               &scratch, &local_data));
+
+  // Copy over the block while one of the directories is failed.
+  FetchDataResponsePB resp;
+  RpcController controller;
+  ASSERT_OK(mini_server_->server()->fs_manager()->dd_manager()->MarkDataDirFailed(1));
+  Status s = DoFetchData(session_id, AsDataTypeId(block_id), nullptr, nullptr, &resp, &controller);
+  LOG(INFO) << "Fetch data request responded with: " << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Unable to get piece of data block");
+  ASSERT_TRUE(s.IsRemoteError());
+
+  // Now close the copy session.
+  RpcController end_copy_controller;
+  EndTabletCopySessionResponsePB end_session_resp;
+  ASSERT_OK(DoEndTabletCopySession(
+      session_id, true, nullptr, &end_session_resp, &end_copy_controller));
+
+  // Starting a new session should fail.
+  s = DoBeginValidTabletCopySession(&session_id, &superblock);
+  LOG(INFO) << "Begin copy session request responded with: " << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Error beginning tablet copy session");
+  ASSERT_TRUE(s.IsRemoteError());
+}
+
 } // namespace tserver
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/8a81f4ff/src/kudu/tserver/tablet_copy_source_session.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc
index 229fe3b..e2a5227 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -34,6 +34,7 @@
 #include "kudu/consensus/opid_util.h"
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/fs/block_manager.h"
+#include "kudu/fs/data_dirs.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/basictypes.h"
@@ -129,6 +130,7 @@ Status TabletCopySourceSession::InitOnce() {
   }
 
   RETURN_NOT_OK(tablet_replica_->CheckRunning());
+  RETURN_NOT_OK(CheckHealthyDirGroup());
 
   const string& tablet_id = tablet_replica_->tablet_id();
 
@@ -300,6 +302,8 @@ Status TabletCopySourceSession::GetBlockPiece(const BlockId& block_id,
                                              string* data, int64_t* block_file_size,
                                              TabletCopyErrorPB::Code* error_code) {
   DCHECK(init_once_.init_succeeded());
+  RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(error_code),
+                        "Tablet copy source could not get block");
   ImmutableReadableBlockInfo* block_info;
   RETURN_NOT_OK(FindBlock(block_id, &block_info, error_code));
 
@@ -320,6 +324,8 @@ Status TabletCopySourceSession::GetLogSegmentPiece(uint64_t segment_seqno,
                                                    std::string* data, int64_t* log_file_size,
                                                    TabletCopyErrorPB::Code* error_code) {
   DCHECK(init_once_.init_succeeded());
+  RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(error_code),
+                        "Tablet copy source could not get log segment");
   ImmutableRandomAccessFileInfo* file_info;
   RETURN_NOT_OK(FindLogSegment(segment_seqno, &file_info, error_code));
   RETURN_NOT_OK(ReadFileChunkToBuf(file_info, offset, client_maxlen,
@@ -356,6 +362,17 @@ static Status AddImmutableFileToMap(Collection* const cache,
   return Status::OK();
 }
 
+Status TabletCopySourceSession::CheckHealthyDirGroup(TabletCopyErrorPB::Code* error_code) const {
+  if (fs_manager_->dd_manager()->IsTabletInFailedDir(tablet_id())) {
+    if (error_code) {
+      *error_code = TabletCopyErrorPB::IO_ERROR;
+    }
+    return Status::IOError(
+        Substitute("Tablet $0 is in a failed directory", tablet_id()));
+  }
+  return Status::OK();
+}
+
 Status TabletCopySourceSession::OpenBlock(const BlockId& block_id) {
   unique_ptr<ReadableBlock> block;
   Status s = fs_manager_->OpenBlock(block_id, &block);

http://git-wip-us.apache.org/repos/asf/kudu/blob/8a81f4ff/src/kudu/tserver/tablet_copy_source_session.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session.h b/src/kudu/tserver/tablet_copy_source_session.h
index 4f6c350..3c433ed 100644
--- a/src/kudu/tserver/tablet_copy_source_session.h
+++ b/src/kudu/tserver/tablet_copy_source_session.h
@@ -174,6 +174,10 @@ class TabletCopySourceSession : public RefCountedThreadSafe<TabletCopySourceSess
   // Internal helper method for Init().
   Status InitOnce();
 
+  // Returns an error if any directories in the tablet's directory group are
+  // unhealthy and sets 'error_code' appropriately.
+  Status CheckHealthyDirGroup(TabletCopyErrorPB::Code* error_code = nullptr) const;
+
   // Open the block and add it to the block map.
   Status OpenBlock(const BlockId& block_id);
 


[2/3] kudu git commit: tablet: add early returns to maintenance functions

Posted by to...@apache.org.
tablet: add early returns to maintenance functions

When a Tablet is stopped, further maintenance ops are not scheduled,
further IO is prevented, etc. This patch optimizes this further to stop
various functions that are called by maintenance threads to prevent
their execution, returning an error instead.

Previously, certain ops (e.g. flush DMS) would guarantee durability by
checking for the success these functions. These checks are now replaced
with on-error checks that the tablet has been stopped, since these
failures are inconsequential if the tablet is stopped.

Currently this is an optimization for tablets that are shutting down to
return early from these calls, but in the future, this could be useful
in stopping all IO done by a tablet that is failing, e.g. due to disk
failure.

Change-Id: I84ad557851863f6fd9acff28ddcd1244e62cf516
Reviewed-on: http://gerrit.cloudera.org:8080/8606
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ae0143ca
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ae0143ca
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ae0143ca

Branch: refs/heads/master
Commit: ae0143ca5dca54a527c905043f96ebf2b293629b
Parents: fb0eced
Author: Andrew Wong <aw...@cloudera.com>
Authored: Thu Oct 26 12:10:21 2017 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Wed Nov 22 06:39:16 2017 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/stop_tablet-itest.cc |  6 ++++++
 src/kudu/tablet/tablet.cc                       |  9 +++++++--
 src/kudu/tablet/tablet_replica.cc               |  7 +++++++
 src/kudu/tablet/tablet_replica.h                |  4 ++++
 src/kudu/tablet/tablet_replica_mm_ops.cc        | 16 ++++++++++------
 5 files changed, 34 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ae0143ca/src/kudu/integration-tests/stop_tablet-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/stop_tablet-itest.cc b/src/kudu/integration-tests/stop_tablet-itest.cc
index c182b6c..04b57b6 100644
--- a/src/kudu/integration-tests/stop_tablet-itest.cc
+++ b/src/kudu/integration-tests/stop_tablet-itest.cc
@@ -97,6 +97,12 @@ class StopTabletITest : public MiniClusterITestBase,
     LOG(INFO) << Substitute("Stopping T $0 P $1", tablet_id, ts->uuid());;
     ASSERT_OK(ts->server()->tablet_manager()->GetTabletReplica(tablet_id, &replica));
     replica->tablet()->Stop();
+
+    // We need to also stop replica ops since, upon running with a stopped
+    // tablet, they'll abort immediately and hog the maintenance threads. In
+    // production, this isn't an issue because replica ops are expected to be
+    // unregistered shortly after stopping the tablet.
+    replica->CancelMaintenanceOpsForTests();
   }
 
   // Starts a new cluster and creates a tablet with the specified number of

http://git-wip-us.apache.org/repos/asf/kudu/blob/ae0143ca/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 15d4d4f..887bb7e 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1065,6 +1065,10 @@ Status Tablet::Flush() {
 
 Status Tablet::FlushUnlocked() {
   TRACE_EVENT0("tablet", "Tablet::FlushUnlocked");
+  {
+    std::lock_guard<simple_spinlock> l(state_lock_);
+    RETURN_NOT_OK(CheckHasNotBeenStoppedUnlocked());
+  }
   RowSetsInCompaction input;
   shared_ptr<MemRowSet> old_mrs;
   {
@@ -1665,8 +1669,7 @@ Status Tablet::Compact(CompactFlags flags) {
 
   input.DumpToLog();
 
-  return DoMergeCompactionOrFlush(input,
-                                  TabletMetadata::kNoMrsFlushed);
+  return DoMergeCompactionOrFlush(input, TabletMetadata::kNoMrsFlushed);
 }
 
 void Tablet::UpdateCompactionStats(MaintenanceOpStats* stats) {
@@ -1874,6 +1877,7 @@ void Tablet::GetInfoForBestDMSToFlush(const ReplaySizeMap& replay_size_map,
 }
 
 Status Tablet::FlushDMSWithHighestRetention(const ReplaySizeMap& replay_size_map) const {
+  RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
   shared_ptr<RowSet> rowset = FindBestDMSToFlush(replay_size_map);
   if (rowset) {
     return rowset->FlushDeltas();
@@ -2112,6 +2116,7 @@ Status Tablet::InitAncientUndoDeltas(MonoDelta time_budget, int64_t* bytes_in_an
 }
 
 Status Tablet::DeleteAncientUndoDeltas(int64_t* blocks_deleted, int64_t* bytes_deleted) {
+  RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
   MonoTime tablet_delete_start = MonoTime::Now();
 
   Timestamp ancient_history_mark;

http://git-wip-us.apache.org/repos/asf/kudu/blob/ae0143ca/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 3dcc68c..05c8e79 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -686,6 +686,13 @@ void TabletReplica::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
   tablet_->RegisterMaintenanceOps(maint_mgr);
 }
 
+void TabletReplica::CancelMaintenanceOpsForTests() {
+  std::lock_guard<simple_spinlock> l(state_change_lock_);
+  for (MaintenanceOp* op : maintenance_ops_) {
+    op->CancelAndDisable();
+  }
+}
+
 void TabletReplica::UnregisterMaintenanceOps() {
   DCHECK(state_change_lock_.is_locked());
   for (MaintenanceOp* op : maintenance_ops_) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/ae0143ca/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index ca2a2eb..5cc7c9f 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -271,6 +271,10 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   // This method is not thread safe.
   void UnregisterMaintenanceOps();
 
+  // Cancels the maintenance ops associated with this replica's tablet.
+  // Only to be used in tests.
+  void CancelMaintenanceOpsForTests();
+
   // Return pointer to the transaction tracker for this peer.
   const TransactionTracker* transaction_tracker() const { return &txn_tracker_; }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ae0143ca/src/kudu/tablet/tablet_replica_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.cc b/src/kudu/tablet/tablet_replica_mm_ops.cc
index e643abd..41f04a3 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.cc
+++ b/src/kudu/tablet/tablet_replica_mm_ops.cc
@@ -141,8 +141,7 @@ void FlushMRSOp::Perform() {
 
   Status s = tablet->FlushUnlocked();
   if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << Substitute("FlushMRS failed on $0: $1",
-                               tablet->tablet_id(), s.ToString());
+    LOG(WARNING) << tablet->LogPrefix() << "failed to flush MRS: " << s.ToString();
     CHECK(tablet->HasBeenStopped()) << "FlushMRS failure is only allowed if the "
                                        "tablet is stopped first";
     return;
@@ -194,10 +193,15 @@ void FlushDeltaMemStoresOp::Perform() {
                  << tablet_replica_->tablet_id();
     return;
   }
-  KUDU_CHECK_OK_PREPEND(tablet_replica_->tablet()->FlushDMSWithHighestRetention(
-                            max_idx_to_replay_size),
-                            Substitute("Failed to flush DMS on $0",
-                                       tablet_replica_->tablet()->tablet_id()));
+  Tablet* tablet = tablet_replica_->tablet();
+  Status s = tablet->FlushDMSWithHighestRetention(max_idx_to_replay_size);
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG(WARNING) << tablet->LogPrefix() << "failed to flush DMS: " << s.ToString();
+    CHECK(tablet->HasBeenStopped()) << "FlushDMS failure is only allowed if the "
+                                       "tablet is stopped first";
+    return;
+  }
+
   {
     std::lock_guard<simple_spinlock> l(lock_);
     time_since_flush_.start();