You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/11/27 06:16:25 UTC

[1/6] kudu git commit: tablet: mark delta tracker read-only on error

Repository: kudu
Updated Branches:
  refs/heads/master 09fb50224 -> 88e39bad1


tablet: mark delta tracker read-only on error

When DeltaTracker::Flush() fails, it leaves a DeltaMemStore in the redo
store list. This is fine for reads because the readpath expects any
DeltaStore to be in that slot, but this is a problem for updates, which
expect only DeltaFileReaders in the list while the compact_flush_lock_
is held.

Before, we would get around this by CHECKing so Flush() could never
return in such a state. This patch introduces a flag to the
DeltaTracker that indicates whether it has experienced such an error.
In order to ensure type-correctness, this flag must be checked
immediately upon entering the critical section of the
compact_flush_lock_.

Change-Id: Ib950048e9cd0929a10714ab1cc2bd829835afced
Reviewed-on: http://gerrit.cloudera.org:8080/8605
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/22987c73
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/22987c73
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/22987c73

Branch: refs/heads/master
Commit: 22987c739ed131861f156c3014eb4e429a53dd2a
Parents: 09fb502
Author: Andrew Wong <aw...@cloudera.com>
Authored: Sat Nov 18 08:57:18 2017 -0800
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Thu Nov 23 00:27:52 2017 +0000

----------------------------------------------------------------------
 src/kudu/tablet/delta_tracker.cc | 27 ++++++++++++++++++++++-----
 src/kudu/tablet/delta_tracker.h  | 13 +++++++++++++
 src/kudu/tablet/diskrowset.cc    |  1 +
 3 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/22987c73/src/kudu/tablet/delta_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 86cea57..ada9c88 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -37,6 +37,7 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/casts.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_applier.h"
@@ -93,6 +94,7 @@ DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata,
     : rowset_metadata_(std::move(rowset_metadata)),
       num_rows_(num_rows),
       open_(false),
+      read_only_(false),
       log_anchor_registry_(log_anchor_registry),
       mem_trackers_(std::move(mem_trackers)),
       dms_empty_(true) {}
@@ -350,11 +352,20 @@ Status DeltaTracker::CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate&
   return Status::OK();
 }
 
+Status DeltaTracker::CheckWritableUnlocked() const {
+  compact_flush_lock_.AssertAcquired();
+  if (PREDICT_FALSE(read_only_)) {
+    return Status::IllegalState("delta tracker has been marked read-only");
+  }
+  return Status::OK();
+}
+
 Status DeltaTracker::CompactStores(int start_idx, int end_idx) {
   // Prevent concurrent compactions or a compaction concurrent with a flush
   //
   // TODO(perf): this could be more fine grained
   std::lock_guard<Mutex> l(compact_flush_lock_);
+  RETURN_NOT_OK(CheckWritableUnlocked());
 
   // At the time of writing, minor delta compaction only compacts REDO delta
   // files, so we need at least 2 REDO delta stores to proceed.
@@ -459,6 +470,7 @@ Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
   // we need to be the only thread doing a flush or a compaction on this RowSet
   // while we do our work.
   std::lock_guard<Mutex> l(compact_flush_lock_);
+  RETURN_NOT_OK(CheckWritableUnlocked());
 
   // Get the list of undo deltas.
   SharedDeltaStoreVector undos_newest_first;
@@ -679,6 +691,7 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
 
 Status DeltaTracker::Flush(MetadataFlushType flush_type) {
   std::lock_guard<Mutex> l(compact_flush_lock_);
+  RETURN_NOT_OK(CheckWritableUnlocked());
 
   // First, swap out the old DeltaMemStore a new one,
   // and add it to the list of delta stores to be reflected
@@ -714,15 +727,19 @@ Status DeltaTracker::Flush(MetadataFlushType flush_type) {
   LOG_WITH_PREFIX(INFO) << "Flushing " << count << " deltas from DMS " << old_dms->id() << "...";
 
   // Now, actually flush the contents of the old DMS.
-  // TODO(awong): failures here leaves a DeltaMemStore permanently in the store
-  // list. For now, handle this by CHECKing for success, but we're going to
-  // want a more concrete solution if, say, we want to handle arbitrary file
-  // errors.
   //
   // TODO(todd): need another lock to prevent concurrent flushers
   // at some point.
   shared_ptr<DeltaFileReader> dfr;
-  CHECK_OK(FlushDMS(old_dms.get(), &dfr, flush_type));
+  Status s = FlushDMS(old_dms.get(), &dfr, flush_type);
+  if (PREDICT_FALSE(!s.ok())) {
+    // A failure here leaves a DeltaMemStore permanently in the store list.
+    // This isn't allowed, and rolling back the store is difficult, so we leave
+    // the delta tracker in an safe, read-only state.
+    CHECK(s.IsDiskFailure()) << LogPrefix() << s.ToString();
+    read_only_ = true;
+    return s;
+  }
 
   // Now, re-take the lock and swap in the DeltaFileReader in place of
   // of the DeltaMemStore

http://git-wip-us.apache.org/repos/asf/kudu/blob/22987c73/src/kudu/tablet/delta_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index bb9bcec..f7388c7 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -259,6 +259,12 @@ class DeltaTracker {
     return &compact_flush_lock_;
   }
 
+  // Returns an error if the delta tracker has been marked read-only.
+  // Else, returns OK.
+  //
+  // 'compact_flush_lock_' must be held when this is called.
+  Status CheckWritableUnlocked() const;
+
   // Init() all of the specified delta stores. For tests only.
   Status InitAllDeltaStoresForTests(WhichStores stores);
 
@@ -321,6 +327,13 @@ class DeltaTracker {
 
   bool open_;
 
+  // Certain errors (e.g. failed delta tracker flushes) may leave the delta
+  // store lists in a state such that it would be unsafe to run further
+  // maintenance ops.
+  //
+  // Must be checked immediately after locking compact_flush_lock_.
+  bool read_only_;
+
   log::LogAnchorRegistry* log_anchor_registry_;
 
   TabletMemTrackers mem_trackers_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/22987c73/src/kudu/tablet/diskrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index dfa5801..adabb79 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -564,6 +564,7 @@ Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>&
   LOG_WITH_PREFIX(INFO) << "Major compacting REDO delta stores (cols: " << col_ids << ")";
   TRACE_EVENT0("tablet", "DiskRowSet::MajorCompactDeltaStoresWithColumnIds");
   std::lock_guard<Mutex> l(*delta_tracker()->compact_flush_lock());
+  RETURN_NOT_OK(delta_tracker()->CheckWritableUnlocked());
 
   // TODO(todd): do we need to lock schema or anything here?
   gscoped_ptr<MajorDeltaCompaction> compaction;


[3/6] kudu git commit: fix warning in stop_tablet-itest

Posted by mp...@apache.org.
fix warning in stop_tablet-itest

This patch fixes the WARN_UNUSED_RESULT() warning logged by
stop_tablet-itest.

Change-Id: I37f3d438b3a355e18012be6ce277256919fbd2df
Reviewed-on: http://gerrit.cloudera.org:8080/8634
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Andrew Wong <aw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b3904330
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b3904330
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b3904330

Branch: refs/heads/master
Commit: b3904330ac84373ec4564d165a32e64273cee841
Parents: 1c74247
Author: Andrew Wong <aw...@cloudera.com>
Authored: Wed Nov 22 10:14:33 2017 -0800
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Thu Nov 23 02:38:58 2017 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/stop_tablet-itest.cc | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b3904330/src/kudu/integration-tests/stop_tablet-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/stop_tablet-itest.cc b/src/kudu/integration-tests/stop_tablet-itest.cc
index 04b57b6..dfcf836 100644
--- a/src/kudu/integration-tests/stop_tablet-itest.cc
+++ b/src/kudu/integration-tests/stop_tablet-itest.cc
@@ -192,8 +192,8 @@ TEST_P(StopTabletITest, TestSingleStoppedTabletsDontScan) {
   // tablet has been stopped, it shouldn't return anything.
   // Even a fault tolerant scanner will not be able to do anything.
   KuduScanner scanner(table.get());
-  scanner.SetTimeoutMillis(500);
-  scanner.SetFaultTolerant();
+  ASSERT_OK(scanner.SetTimeoutMillis(500));
+  ASSERT_OK(scanner.SetFaultTolerant());
   Status s = scanner.Open();
   LOG(INFO) << "Scanner opened with status: " << s.ToString();
   ASSERT_TRUE(s.IsTimedOut()) << s.ToString();


[2/6] kudu git commit: error_manager: synchronize/serialize handling

Posted by mp...@apache.org.
error_manager: synchronize/serialize handling

The FsErrorManager is helper infrastructure that other classes can use to
help provide API contracts related to error handling.

Here is an example error-handling contract provided to the TSTabletManager
in a future patch, if a disk failure Status is returned during tablet
operation, either:
1) the tablet server will crash and we can rely on Kudu's crash-consistency
   mechanisms for safety, or
2) any affected Tablet will transition to the 'kStopped' state via an error
   manager callback. Most components will simply pass the non-OK Status up
   the call chain. However, if a method expects an IO operation to
   succeed for correctness purposes, and it receives an error Status,
   then it should check that the Tablet is stopped. If so, it can be
   assumed that the error was handled. Otherwise, Kudu must rely on the
   crash-consistency mechanisms and crash.

Given the above contract, The state of a tablet server post-disk-failure
depends significantly on the completion of disk-failure-handling
callbacks. Error-handling _must_ finish before anything is propagated
back to the offending caller.

To ensure correct completion of error-handling even in a concurrent
setting, this patch extends the error manager with a mutex and a second
error-handling callback type. This ensures that errors indirectly caused
by disk failures can be handled by non-disk-specific handling,
serializing failure-handling in the same fashion.

As an example of where this is necessary, say a tablet has data in a
single directory and hits a bad disk. That directory is immediately
marked failed and handling starts to fail all tablets in the directory.
Before, if the tablet were to create a new block before being failed, it
would fail immediately, complaining that no directories are available,
and would eventually fail a CHECK that translates roughly to: "Has error
handling for this tablet completed?" By wrapping block creation with
tablet-specific error handling and with serialized error-handling, this
CHECK will pass.

A previous revision accomplished this by using a single-threaded
threadpool to trigger error-handling and ensuring completion of
error-handling by waiting on the threadpool. I ultimately went with this
implentation since it's a bit more straight-forward wrt when to make the
different calls (i.e. "trigger disk error-handling vs trigger tablet
error-handling", instead of "trigger error-handling vs wait for
error-handling to finish"). This also has the benefit of being
extendable for other kinds of errors in the future.

Change-Id: Ie61c408a0b4424f933f40a31147568c2f906be0e
Reviewed-on: http://gerrit.cloudera.org:8080/8395
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1c742470
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1c742470
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1c742470

Branch: refs/heads/master
Commit: 1c7424706a22ec93ab98086025e4154b815f32b3
Parents: 22987c7
Author: Andrew Wong <aw...@cloudera.com>
Authored: Wed Oct 25 14:15:11 2017 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Thu Nov 23 00:43:02 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/CMakeLists.txt            |   2 +
 src/kudu/fs/data_dirs-test.cc         |   8 +-
 src/kudu/fs/error_manager-test.cc     | 199 +++++++++++++++++++++++++++++
 src/kudu/fs/error_manager.cc          |  84 ++++++++++++
 src/kudu/fs/error_manager.h           |  70 ++++++----
 src/kudu/fs/file_block_manager.cc     |  21 ++-
 src/kudu/fs/fs_manager.cc             |  14 +-
 src/kudu/fs/fs_manager.h              |   6 +-
 src/kudu/fs/log_block_manager-test.cc |   4 +-
 src/kudu/fs/log_block_manager.cc      |  27 ++--
 src/kudu/tserver/tablet_server.cc     |   8 +-
 src/kudu/util/status.h                |  10 ++
 12 files changed, 392 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index 0074082..b4dda2b 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -31,6 +31,7 @@ add_library(kudu_fs
   block_manager_metrics.cc
   block_manager_util.cc
   data_dirs.cc
+  error_manager.cc
   file_block_manager.cc
   fs_manager.cc
   fs_report.cc
@@ -56,6 +57,7 @@ ADD_KUDU_TEST(block_manager-test)
 ADD_KUDU_TEST(block_manager_util-test)
 ADD_KUDU_TEST(block_manager-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(data_dirs-test)
+ADD_KUDU_TEST(error_manager-test)
 ADD_KUDU_TEST(fs_manager-test)
 if (NOT APPLE)
   # Will only pass on Linux.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/data_dirs-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs-test.cc b/src/kudu/fs/data_dirs-test.cc
index e658157..d35facc 100644
--- a/src/kudu/fs/data_dirs-test.cc
+++ b/src/kudu/fs/data_dirs-test.cc
@@ -394,8 +394,8 @@ TEST_F(DataDirManagerTest, TestOpenWithFailedDirs) {
   for (const string& test_root : test_roots_) {
     ASSERT_OK(env_->CreateDir(test_root));
   }
-  ASSERT_OK(DataDirManager::CreateNewForTests(env_, test_roots_,
-      DataDirManagerOptions(), &dd_manager_));
+  ASSERT_OK(DataDirManager::CreateNewForTests(
+      env_, test_roots_, DataDirManagerOptions(), &dd_manager_));
 
   // Kill the first non-metadata directory.
   FLAGS_crash_on_eio = false;
@@ -441,8 +441,8 @@ TEST_F(TooManyDataDirManagerTest, TestTooManyInternedStrings) {
   for (const auto& r : test_roots_) {
     ASSERT_OK(env_->CreateDir(r));
   }
-  ASSERT_OK(DataDirManager::CreateNewForTests(
-      env_, test_roots_, DataDirManagerOptions(), &dd_manager_));
+  ASSERT_OK(DataDirManager::CreateNewForTests(env_, test_roots_,
+      DataDirManagerOptions(), &dd_manager_));
 }
 
 } // namespace fs

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/error_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager-test.cc b/src/kudu/fs/error_manager-test.cc
new file mode 100644
index 0000000..0c06f0f
--- /dev/null
+++ b/src/kudu/fs/error_manager-test.cc
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <map>
+#include <memory>
+#include <set>
+#include <sstream>
+#include <stdlib.h>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/fs/error_manager.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/bind_helpers.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/threading/thread_collision_warner.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_util.h"
+
+using std::map;
+using std::set;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+const int kVecSize = 10;
+
+namespace kudu {
+namespace fs {
+
+// These tests are designed to ensure callback serialization by using callbacks
+// that update a single vector.
+class FsErrorManagerTest : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    em_.reset(new FsErrorManager());
+    test_vec_.resize(kVecSize, -1);
+  }
+
+  // Returns a stringified version of the single vector that each thread is
+  // updating.
+  string test_vec_string() {
+    return JoinInts(test_vec_, " ");
+  }
+
+  // Sleeps for a random amount of time, up to 500ms.
+  void SleepForRand() {
+    SleepFor(MonoDelta::FromMilliseconds(rand() % 500));
+  }
+
+  // Returns the index of the first instance of 'k' in test_vec_.
+  int FindFirst(int k) {
+    int first = -1;
+    for (int i = 0; i < test_vec_.size(); i++) {
+      if (test_vec_[i] == k) {
+        first = i;
+        break;
+      }
+    }
+    return first;
+  }
+
+  // Writes i to the first available (-1) entry in test_vec_ after sleeping a
+  // random amount of time. If multiple calls to this are running at the same
+  // time, it is likely that some will write to the same entry.
+  //
+  // NOTE: this can be curried into an ErrorNotificationCb.
+  void SleepAndWriteFirstEmptyCb(int i, const string& /* s */) {
+    DFAKE_SCOPED_LOCK(fake_lock_);
+    int first_available = FindFirst(-1);
+    SleepForRand();
+    if (first_available == -1) {
+      LOG(INFO) << "No available entries!";
+      return;
+    }
+    test_vec_[first_available] = i;
+  }
+
+  // Returns a map between each unique value in test_vec_ and the indices
+  // within test_vec_ at which the value is located.
+  map<int, set<int>> GetPositions() {
+    map<int, set<int>> positions;
+    for (int i = 0; i < test_vec_.size(); i++) {
+      positions[test_vec_[i]].insert(i);
+    }
+    return positions;
+  }
+
+  FsErrorManager* em() const { return em_.get(); }
+
+ protected:
+  // The single vector that the error notification callbacks will all write to.
+  vector<int> test_vec_;
+
+ private:
+  unique_ptr<FsErrorManager> em_;
+
+  // Fake lock used to ensure threads don't run error-handling at the same time.
+  DFAKE_MUTEX(fake_lock_);
+};
+
+// Tests the basic functionality (i.e. registering, unregistering, calling
+// callbacks) of the error manager.
+TEST_F(FsErrorManagerTest, TestBasicRegistration) {
+  // Before registering anything, there should be all '-1's in test_vec_.
+  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::DISK));
+  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::TABLET));
+
+  // Register a callback to update the first '-1' entry in test_vec_ to '0'
+  // after waiting a random amount of time.
+  em()->SetErrorNotificationCb(ErrorHandlerType::DISK,
+      Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
+           Unretained(this), ErrorHandlerType::DISK));
+  em()->RunErrorNotificationCb(ErrorHandlerType::DISK, "");
+  ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK));
+
+  // Running callbacks that haven't been registered should do nothing.
+  em()->RunErrorNotificationCb(ErrorHandlerType::TABLET, "");
+  ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK));
+  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::TABLET));
+
+  // Now register another callback.
+  em()->SetErrorNotificationCb(ErrorHandlerType::TABLET,
+      Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
+           Unretained(this), ErrorHandlerType::TABLET));
+  em()->RunErrorNotificationCb(ErrorHandlerType::TABLET, "");
+  ASSERT_EQ(1, FindFirst(ErrorHandlerType::TABLET));
+
+  // Now unregister one of the callbacks. This should not affect the other.
+  em()->UnsetErrorNotificationCb(ErrorHandlerType::DISK);
+  em()->RunErrorNotificationCb(ErrorHandlerType::DISK, "");
+  em()->RunErrorNotificationCb(ErrorHandlerType::TABLET, "");
+
+  LOG(INFO) << "Final state of the vector: " << test_vec_string();
+  map<int, set<int>> positions = GetPositions();
+  set<int> disk_set = { 0 };        // The first entry should be DISK...
+  set<int> tablet_set = { 1, 2 };   // ...followed by TABLET, TABLET.
+  ASSERT_EQ(disk_set, FindOrDie(positions, ErrorHandlerType::DISK));
+  ASSERT_EQ(tablet_set, FindOrDie(positions, ErrorHandlerType::TABLET));
+}
+
+// Test that the callbacks get run serially.
+TEST_F(FsErrorManagerTest, TestSerialization) {
+  em()->SetErrorNotificationCb(ErrorHandlerType::DISK,
+      Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
+           Unretained(this), ErrorHandlerType::DISK));
+  em()->SetErrorNotificationCb(ErrorHandlerType::TABLET,
+      Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
+           Unretained(this), ErrorHandlerType::TABLET));
+
+  // Swap back and forth between error-handler type.
+  const auto IntToEnum = [&] (int i) {
+    return i % 2 == 0 ? ErrorHandlerType::DISK : ErrorHandlerType::TABLET;
+  };
+
+  vector<thread> cb_threads;
+  for (int i = 0; i < kVecSize; i++) {
+    // Each call will update the first available entry in test_vec_ after
+    // waiting a random amount of time. Without proper serialization, these
+    // could end up writing to the same entry.
+    cb_threads.emplace_back([&,i] {
+      em()->RunErrorNotificationCb(IntToEnum(i), "");
+    });
+  }
+  for (auto& t : cb_threads) {
+    t.join();
+  }
+  LOG(INFO) << "Final state of the vector: " << test_vec_string();
+
+  // Because the callbacks ran serially, all threads should have been able to
+  // get a unique slot in test_vec_, and thus, all entries should be filled.
+  // Note that the order of the calls does not matter, only the fact that they
+  // ran serially.
+  CHECK_EQ(-1, FindFirst(-1));
+}
+
+}  // namespace fs
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/error_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager.cc b/src/kudu/fs/error_manager.cc
new file mode 100644
index 0000000..dd83f18
--- /dev/null
+++ b/src/kudu/fs/error_manager.cc
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/fs/error_manager.h"
+#include "kudu/gutil/bind.h"
+
+using std::string;
+
+namespace kudu {
+
+namespace fs {
+
+// Default error-handling callback that no-ops.
+static void DoNothingErrorNotification(const string& /* uuid */) {}
+
+FsErrorManager::FsErrorManager() :
+  disk_cb_(Bind(DoNothingErrorNotification)),
+  tablet_cb_(Bind(DoNothingErrorNotification)) {}
+
+void FsErrorManager::SetErrorNotificationCb(ErrorHandlerType e, ErrorNotificationCb cb) {
+  std::lock_guard<Mutex> l(lock_);
+  switch (e) {
+    case ErrorHandlerType::DISK:
+      disk_cb_ = std::move(cb);
+      return;
+    case ErrorHandlerType::TABLET:
+      tablet_cb_ = std::move(cb);
+      return;
+    default:
+      LOG(FATAL) << "Unknown error handler type!";
+  }
+}
+
+void FsErrorManager::UnsetErrorNotificationCb(ErrorHandlerType e) {
+  std::lock_guard<Mutex> l(lock_);
+  switch (e) {
+    case ErrorHandlerType::DISK:
+      disk_cb_ = Bind(DoNothingErrorNotification);
+      return;
+    case ErrorHandlerType::TABLET:
+      tablet_cb_ = Bind(DoNothingErrorNotification);
+      return;
+    default:
+      LOG(FATAL) << "Unknown error handler type!";
+  }
+}
+
+void FsErrorManager::RunErrorNotificationCb(ErrorHandlerType e, const string& uuid) const {
+  std::lock_guard<Mutex> l(lock_);
+  switch (e) {
+    case ErrorHandlerType::DISK:
+      disk_cb_.Run(uuid);
+      return;
+    case ErrorHandlerType::TABLET:
+      tablet_cb_.Run(uuid);
+      return;
+    default:
+      LOG(FATAL) << "Unknown error handler type!";
+  }
+}
+
+}  // namespace fs
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/error_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager.h b/src/kudu/fs/error_manager.h
index cf0fdf7..a4e61c1 100644
--- a/src/kudu/fs/error_manager.h
+++ b/src/kudu/fs/error_manager.h
@@ -17,16 +17,16 @@
 
 #pragma once
 
-#include <map>
 #include <string>
 
+#include <glog/logging.h>
+
 #include "kudu/fs/block_manager_util.h"
 #include "kudu/fs/data_dirs.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/callback_forward.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/fault_injection.h"
-#include "kudu/util/status.h"
+#include "kudu/fs/fs.pb.h"
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/mutex.h"
 
 namespace kudu {
 namespace fs {
@@ -37,7 +37,6 @@ namespace fs {
 // e.g. the ErrorNotificationCb for disk failure handling takes the UUID of a
 // directory, marks it failed, and shuts down the tablets in that directory.
 typedef Callback<void(const std::string&)> ErrorNotificationCb;
-static void DoNothingErrorNotification(const std::string& /* uuid */) {}
 
 // Evaluates the expression and handles it if it results in an error.
 // Returns if the status is an error.
@@ -72,47 +71,70 @@ static void DoNothingErrorNotification(const std::string& /* uuid */) {}
   } \
 } while (0);
 
-// When certain operations fail, the side effects of the error can span
-// multiple layers, many of which we prefer to keep separate. The FsErrorManager
+enum ErrorHandlerType {
+  // For errors that affect a disk and all of its tablets (e.g. disk failure).
+  DISK,
+
+  // For errors that affect a single tablet (e.g. failure to create a block).
+  TABLET
+};
+
+// When certain operations fail, the side effects of the error can span multiple
+// layers, many of which we prefer to keep separate. The FsErrorManager
 // registers and runs error handlers without adding cross-layer dependencies.
+// Additionally, it enforces one callback is run at a time, and that each
+// callback fully completes before returning.
 //
 // e.g. the TSTabletManager registers a callback to handle disk failure.
 // Blocks and other entities that may hit disk failures can call it without
 // knowing about the TSTabletManager.
 class FsErrorManager {
  public:
-  FsErrorManager()
-    : notify_cb_(Bind(DoNothingErrorNotification)) {}
+  // TODO(awong): Register an actual error-handling function for tablet. Some
+  // errors may surface indirectly due to disk errors, but may not
+  // necessarily be caused by the failure of a specific disk.
+  //
+  // For example, if all of the disks in a tablet's directory group have
+  // already failed, the tablet would not be able to create a new block and
+  // return an error, despite CreateNewBlock() not actually touching disk.
+  // Before CreateNewBlock() returns, in order to satisfy various saftey
+  // checks surrounding the state of tablet post-failure, it must wait for
+  // disk failure handling of the failed disks to return.
+  //
+  // While this callback is a no-op, it serves to enforce that any
+  // error-handling caused by ERROR1 that may have indirectly caused ERROR2
+  // must complete before ERROR2 can be returned to its caller.
+  FsErrorManager();
 
   // Sets the error notification callback.
   //
   // This should be called when the callback's callee is initialized.
-  void SetErrorNotificationCb(ErrorNotificationCb cb) {
-    notify_cb_ = std::move(cb);
-  }
+  void SetErrorNotificationCb(ErrorHandlerType e, ErrorNotificationCb cb);
 
   // Resets the error notification callback.
   //
   // This must be called before the callback's callee is destroyed.
-  void UnsetErrorNotificationCb() {
-    notify_cb_ = Bind(DoNothingErrorNotification);
-  }
+  void UnsetErrorNotificationCb(ErrorHandlerType e);
 
   // Runs the error notification callback.
   //
   // 'uuid' is the full UUID of the component that failed.
-  void RunErrorNotificationCb(const std::string& uuid) const {
-    notify_cb_.Run(uuid);
-  }
+  void RunErrorNotificationCb(ErrorHandlerType e, const std::string& uuid) const;
 
   // Runs the error notification callback with the UUID of 'dir'.
-  void RunErrorNotificationCb(const DataDir* dir) const {
-    notify_cb_.Run(dir->instance()->metadata()->path_set().uuid());
+  void RunErrorNotificationCb(ErrorHandlerType e, const DataDir* dir) const {
+    DCHECK_EQ(e, ErrorHandlerType::DISK);
+    RunErrorNotificationCb(e, dir->instance()->metadata()->path_set().uuid());
   }
 
  private:
-   // Callback to be run when an error occurs.
-   ErrorNotificationCb notify_cb_;
+   // Callbacks to be run when an error occurs.
+  ErrorNotificationCb disk_cb_;
+  ErrorNotificationCb tablet_cb_;
+
+   // Protects calls to notifications, enforcing that a single callback runs at
+   // a time.
+   mutable Mutex lock_;
 };
 
 }  // namespace fs

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index dd69c5c..d5452f2 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -313,7 +313,8 @@ FileWritableBlock::~FileWritableBlock() {
 
 void FileWritableBlock::HandleError(const Status& s) const {
   HANDLE_DISK_FAILURE(
-      s, block_manager_->error_manager()->RunErrorNotificationCb(location_.data_dir()));
+      s, block_manager_->error_manager()->RunErrorNotificationCb(
+          ErrorHandlerType::DISK, location_.data_dir()));
 }
 
 Status FileWritableBlock::Close() {
@@ -471,7 +472,8 @@ class FileReadableBlock : public ReadableBlock {
 void FileReadableBlock::HandleError(const Status& s) const {
   const DataDir* dir = block_manager_->dd_manager_->FindDataDirByUuidIndex(
       internal::FileBlockLocation::GetDataDirIdx(block_id_));
-  HANDLE_DISK_FAILURE(s, block_manager_->error_manager()->RunErrorNotificationCb(dir));
+  HANDLE_DISK_FAILURE(s, block_manager_->error_manager()->RunErrorNotificationCb(
+      ErrorHandlerType::DISK, dir));
 }
 
 FileReadableBlock::FileReadableBlock(FileBlockManager* block_manager,
@@ -666,7 +668,8 @@ Status FileBlockManager::SyncMetadata(const internal::FileBlockLocation& locatio
     for (const string& s : to_sync) {
       if (metrics_) metrics_->total_disk_sync->Increment();
       RETURN_NOT_OK_HANDLE_DISK_FAILURE(env_->SyncDir(s),
-          error_manager_->RunErrorNotificationCb(location.data_dir()));
+          error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK,
+                                                 location.data_dir()));
     }
   }
   return Status::OK();
@@ -740,7 +743,8 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
   CHECK(!opts_.read_only);
 
   DataDir* dir;
-  RETURN_NOT_OK(dd_manager_->GetNextDataDir(opts, &dir));
+  RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::TABLET, opts.tablet_id));
   int uuid_idx;
   CHECK(dd_manager_->FindUuidIndexByDataDir(dir, &uuid_idx));
 
@@ -775,7 +779,8 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
     // We could create a block in a different directory, but there's currently
     // no point in doing so. On disk failure, the tablet specified by 'opts'
     // will be shut down, so the returned block would not be used.
-    RETURN_NOT_OK_HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
+    RETURN_NOT_OK_HANDLE_DISK_FAILURE(s,
+        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
     WritableFileOptions wr_opts;
     wr_opts.mode = Env::CREATE_NON_EXISTING;
     s = env_util::OpenFileForWrite(wr_opts, env_, path, &writer);
@@ -794,7 +799,8 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
     }
     block->reset(new internal::FileWritableBlock(this, location, writer));
   } else {
-    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
+    HANDLE_DISK_FAILURE(s,
+        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
     return s;
   }
   return Status::OK();
@@ -802,7 +808,8 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
 
 #define RETURN_NOT_OK_FBM_DISK_FAILURE(status_expr) do { \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
-      error_manager_->RunErrorNotificationCb(dd_manager_->FindDataDirByUuidIndex( \
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, \
+      dd_manager_->FindDataDirByUuidIndex( \
       internal::FileBlockLocation::GetDataDirIdx(block_id)))); \
 } while (0);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/fs_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 34b81ae..230ce71 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -95,6 +95,8 @@ using kudu::fs::BlockManagerOptions;
 using kudu::fs::CreateBlockOptions;
 using kudu::fs::DataDirManager;
 using kudu::fs::DataDirManagerOptions;
+using kudu::fs::ErrorHandlerType;
+using kudu::fs::ErrorNotificationCb;
 using kudu::fs::FsErrorManager;
 using kudu::fs::FileBlockManager;
 using kudu::fs::FsReport;
@@ -156,12 +158,12 @@ FsManager::FsManager(Env* env, FsManagerOpts opts)
 
 FsManager::~FsManager() {}
 
-void FsManager::SetErrorNotificationCb(fs::ErrorNotificationCb cb) {
-  error_manager_->SetErrorNotificationCb(std::move(cb));
+void FsManager::SetErrorNotificationCb(ErrorHandlerType e, ErrorNotificationCb cb) {
+  error_manager_->SetErrorNotificationCb(e, std::move(cb));
 }
 
-void FsManager::UnsetErrorNotificationCb() {
-  error_manager_->UnsetErrorNotificationCb();
+void FsManager::UnsetErrorNotificationCb(ErrorHandlerType e) {
+  error_manager_->UnsetErrorNotificationCb(e);
 }
 
 Status FsManager::Init() {
@@ -374,8 +376,8 @@ Status FsManager::Open(FsReport* report) {
   }
 
   // Set an initial error handler to mark data directories as failed.
-  error_manager_->SetErrorNotificationCb(Bind(&DataDirManager::MarkDataDirFailedByUuid,
-                                              Unretained(dd_manager_.get())));
+  error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK,
+      Bind(&DataDirManager::MarkDataDirFailedByUuid, Unretained(dd_manager_.get())));
 
   // Finally, initialize and open the block manager.
   InitBlockManager();

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/fs_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index cbb08a9..2c0fc21 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -141,13 +141,13 @@ class FsManager {
   //
   // If a disk failure is detected, this callback will be invoked with the
   // relevant DataDir's UUID as its input parameter.
-  void SetErrorNotificationCb(fs::ErrorNotificationCb cb);
+  void SetErrorNotificationCb(fs::ErrorHandlerType e, fs::ErrorNotificationCb cb);
 
   // Unregisters the error-handling callback with the FsErrorManager.
   //
   // This must be called before the callback's callee is destroyed. Calls to
   // this are idempotent and are safe even if a callback has not been set.
-  void UnsetErrorNotificationCb();
+  void UnsetErrorNotificationCb(fs::ErrorHandlerType e);
 
   // Create the initial filesystem layout. If 'uuid' is provided, uses it as
   // uuid of the filesystem. Otherwise generates one at random.
@@ -324,9 +324,9 @@ class FsManager {
 
   std::unique_ptr<InstanceMetadataPB> metadata_;
 
+  std::unique_ptr<fs::FsErrorManager> error_manager_;
   std::unique_ptr<fs::DataDirManager> dd_manager_;
   std::unique_ptr<fs::BlockManager> block_manager_;
-  std::unique_ptr<fs::FsErrorManager> error_manager_;
 
   bool initted_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 856c004..eca2ccc 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -1624,8 +1624,8 @@ TEST_F(LogBlockManagerTest, TestOpenWithFailedDirectories) {
       DataDirManagerOptions(), &dd_manager_));
 
   // Wire in a callback to fail data directories.
-  test_error_manager_->SetErrorNotificationCb(Bind(&DataDirManager::MarkDataDirFailedByUuid,
-                                                   Unretained(dd_manager_.get())));
+  test_error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK,
+      Bind(&DataDirManager::MarkDataDirFailedByUuid, Unretained(dd_manager_.get())));
   bm_.reset(CreateBlockManager(nullptr));
 
   // Fail one of the directories, chosen randomly.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index adca9f3..2958358 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -621,13 +621,13 @@ LogBlockContainer::LogBlockContainer(
 }
 
 void LogBlockContainer::HandleError(const Status& s) const {
-  HANDLE_DISK_FAILURE(
-      s, block_manager()->error_manager()->RunErrorNotificationCb(data_dir_));
+  HANDLE_DISK_FAILURE(s,
+      block_manager()->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK, data_dir_));
 }
 
 #define RETURN_NOT_OK_CONTAINER_DISK_FAILURE(status_expr) do { \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
-    block_manager->error_manager()->RunErrorNotificationCb(dir)); \
+    block_manager->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK, dir)); \
 } while (0);
 
 Status LogBlockContainer::Create(LogBlockManager* block_manager,
@@ -691,8 +691,9 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
   }
 
   // Prefer metadata status (arbitrarily).
-  HANDLE_DISK_FAILURE(metadata_status, block_manager->error_manager()->RunErrorNotificationCb(dir));
-  HANDLE_DISK_FAILURE(data_status, block_manager->error_manager()->RunErrorNotificationCb(dir));
+  FsErrorManager* em = block_manager->error_manager();
+  HANDLE_DISK_FAILURE(metadata_status, em->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
+  HANDLE_DISK_FAILURE(data_status, em->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
   return !metadata_status.ok() ? metadata_status : data_status;
 }
 
@@ -1926,7 +1927,8 @@ void LogBlockManager::RemoveFullContainerUnlocked(const string& container_name)
 Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
                                              LogBlockContainer** container) {
   DataDir* dir;
-  RETURN_NOT_OK(dd_manager_->GetNextDataDir(opts, &dir));
+  RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::TABLET, opts.tablet_id));
 
   {
     std::lock_guard<simple_spinlock> l(lock_);
@@ -1945,7 +1947,7 @@ Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
   // We could create a container in a different directory, but there's
   // currently no point in doing so. On disk failure, the tablet specified by
   // 'opts' will be shut down, so the returned container would not be used.
-  HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
+  HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
   RETURN_NOT_OK_PREPEND(s, "Could not create new log block container at " + dir->dir());
   {
     std::lock_guard<simple_spinlock> l(lock_);
@@ -2126,7 +2128,7 @@ Status LogBlockManager::RemoveLogBlockUnlocked(const BlockId& block_id,
 
   LogBlockContainer* container = it->second->container();
   HANDLE_DISK_FAILURE(container->read_only_status(),
-                      error_manager_->RunErrorNotificationCb(container->data_dir()));
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, container->data_dir()));
 
   // Return early if deleting a block in a failed directory.
   set<int> failed_dirs = dd_manager_->GetFailedDataDirs();
@@ -2180,7 +2182,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   vector<string> children;
   Status s = env_->GetChildren(dir->dir(), &children);
   if (!s.ok()) {
-    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
+    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
     *result_status = s.CloneAndPrepend(Substitute(
         "Could not list children of $0", dir->dir()));
     return;
@@ -2333,7 +2335,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       uint64_t reported_size;
       s = env_->GetFileSizeOnDisk(data_filename, &reported_size);
       if (!s.ok()) {
-        HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
+        HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
         *result_status = s.CloneAndPrepend(Substitute(
             "Could not get on-disk file size of container $0", container->ToString()));
         return;
@@ -2416,12 +2418,13 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
 #define RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(status_expr, msg) do { \
   Status s_ = (status_expr); \
   s_ = s_.CloneAndPrepend(msg); \
-  RETURN_NOT_OK_HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb(dir)); \
+  RETURN_NOT_OK_HANDLE_DISK_FAILURE(s_, \
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir)); \
 } while (0);
 
 #define WARN_NOT_OK_LBM_DISK_FAILURE(status_expr, msg) do { \
   Status s_ = (status_expr); \
-  HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb(dir)); \
+  HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir)); \
   WARN_NOT_OK(s_, msg); \
 } while (0);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 95b2b1e..8fdb85d 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -25,6 +25,7 @@
 #include <glog/logging.h>
 
 #include "kudu/cfile/block_cache.h"
+#include "kudu/fs/error_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/bind_helpers.h"
@@ -42,6 +43,7 @@
 #include "kudu/util/status.h"
 
 using std::string;
+using kudu::fs::ErrorHandlerType;
 using kudu::rpc::ServiceIf;
 
 namespace kudu {
@@ -112,8 +114,8 @@ Status TabletServer::WaitInited() {
 Status TabletServer::Start() {
   CHECK(initted_);
 
-  fs_manager_->SetErrorNotificationCb(Bind(&TSTabletManager::FailTabletsInDataDir,
-                                           Unretained(tablet_manager_.get())));
+  fs_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK,
+      Bind(&TSTabletManager::FailTabletsInDataDir, Unretained(tablet_manager_.get())));
 
   gscoped_ptr<ServiceIf> ts_service(new TabletServiceImpl(this));
   gscoped_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));
@@ -146,7 +148,7 @@ void TabletServer::Shutdown() {
     // 2. Shut down the tserver's subsystems.
     maintenance_manager_->Shutdown();
     WARN_NOT_OK(heartbeater_->Stop(), "Failed to stop TS Heartbeat thread");
-    fs_manager_->UnsetErrorNotificationCb();
+    fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::DISK);
     tablet_manager_->Shutdown();
 
     // 3. Shut down generic subsystems.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/util/status.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/status.h b/src/kudu/util/status.h
index 6318e6d..afcdc8f 100644
--- a/src/kudu/util/status.h
+++ b/src/kudu/util/status.h
@@ -52,6 +52,15 @@
     if (PREDICT_FALSE(!s.ok())) return (to_return);  \
   } while (0);
 
+/// @brief Return the given status if it is not OK, evaluating `on_error` if so.
+#define KUDU_RETURN_NOT_OK_EVAL(s, on_error) do { \
+    const ::kudu::Status& _s = (s); \
+    if (PREDICT_FALSE(!_s.ok())) { \
+      (on_error); \
+      return _s; \
+    } \
+  } while (0);
+
 /// @brief Emit a warning if @c to_call returns a bad status.
 #define KUDU_WARN_NOT_OK(to_call, warning_prefix) do { \
     const ::kudu::Status& _s = (to_call);              \
@@ -115,6 +124,7 @@
 #define RETURN_NOT_OK         KUDU_RETURN_NOT_OK
 #define RETURN_NOT_OK_PREPEND KUDU_RETURN_NOT_OK_PREPEND
 #define RETURN_NOT_OK_RET     KUDU_RETURN_NOT_OK_RET
+#define RETURN_NOT_OK_EVAL    KUDU_RETURN_NOT_OK_EVAL
 #define WARN_NOT_OK           KUDU_WARN_NOT_OK
 #define LOG_AND_RETURN        KUDU_LOG_AND_RETURN
 #define RETURN_NOT_OK_LOG     KUDU_RETURN_NOT_OK_LOG


[4/6] kudu git commit: make registration of maintenance ops thread-safe

Posted by mp...@apache.org.
make registration of maintenance ops thread-safe

Before, access to tablet replicas' maintenance ops was not thread-safe.
This was OK because the TSTabletManager externally synchronizes the
initialization and shutdown of replicas, enabling relatively lax locking
for the ops.

In the future, there will be calls that access the maintenance ops
outside of initialization and shutdown, e.g. from some external thread
to handle disk failures by canceling the affected tablets' maintenance
ops. To do so, more stringent locking is needed.

This patch uses TabletReplica's lock_ to synchronize access to the ops.

Change-Id: Ia13051fab85d0f678bd3efdcb69766d66a657cdd
Reviewed-on: http://gerrit.cloudera.org:8080/8635
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5523b0a3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5523b0a3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5523b0a3

Branch: refs/heads/master
Commit: 5523b0a307133e5e6827f56d9e6045609936a039
Parents: b390433
Author: Andrew Wong <aw...@cloudera.com>
Authored: Wed Nov 22 16:00:27 2017 -0800
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Thu Nov 23 03:48:21 2017 +0000

----------------------------------------------------------------------
 src/kudu/tablet/tablet_replica.cc | 30 +++++++++++++++++++++---------
 src/kudu/tablet/tablet_replica.h  |  7 +++----
 2 files changed, 24 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5523b0a3/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 05c8e79..d44e6b4 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -661,7 +661,7 @@ Status TabletReplica::NewReplicaTransactionDriver(gscoped_ptr<Transaction> trans
 void TabletReplica::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
   // Taking state_change_lock_ ensures that we don't shut down concurrently with
   // this last start-up task.
-  std::lock_guard<simple_spinlock> l(state_change_lock_);
+  std::lock_guard<simple_spinlock> state_change_lock(state_change_lock_);
 
   if (state() != RUNNING) {
     LOG(WARNING) << "Not registering maintenance operations for " << tablet_
@@ -669,25 +669,32 @@ void TabletReplica::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
     return;
   }
 
-  DCHECK(maintenance_ops_.empty());
+  vector<MaintenanceOp*> maintenance_ops;
 
   gscoped_ptr<MaintenanceOp> mrs_flush_op(new FlushMRSOp(this));
   maint_mgr->RegisterOp(mrs_flush_op.get());
-  maintenance_ops_.push_back(mrs_flush_op.release());
+  maintenance_ops.push_back(mrs_flush_op.release());
 
   gscoped_ptr<MaintenanceOp> dms_flush_op(new FlushDeltaMemStoresOp(this));
   maint_mgr->RegisterOp(dms_flush_op.get());
-  maintenance_ops_.push_back(dms_flush_op.release());
+  maintenance_ops.push_back(dms_flush_op.release());
 
   gscoped_ptr<MaintenanceOp> log_gc(new LogGCOp(this));
   maint_mgr->RegisterOp(log_gc.get());
-  maintenance_ops_.push_back(log_gc.release());
+  maintenance_ops.push_back(log_gc.release());
 
-  tablet_->RegisterMaintenanceOps(maint_mgr);
+  std::shared_ptr<Tablet> tablet;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    DCHECK(maintenance_ops_.empty());
+    maintenance_ops_.swap(maintenance_ops);
+    tablet = tablet_;
+  }
+  tablet->RegisterMaintenanceOps(maint_mgr);
 }
 
 void TabletReplica::CancelMaintenanceOpsForTests() {
-  std::lock_guard<simple_spinlock> l(state_change_lock_);
+  std::lock_guard<simple_spinlock> l(lock_);
   for (MaintenanceOp* op : maintenance_ops_) {
     op->CancelAndDisable();
   }
@@ -695,10 +702,15 @@ void TabletReplica::CancelMaintenanceOpsForTests() {
 
 void TabletReplica::UnregisterMaintenanceOps() {
   DCHECK(state_change_lock_.is_locked());
-  for (MaintenanceOp* op : maintenance_ops_) {
+  vector<MaintenanceOp*> maintenance_ops;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    maintenance_ops.swap(maintenance_ops_);
+  }
+  for (MaintenanceOp* op : maintenance_ops) {
     op->Unregister();
   }
-  STLDeleteElements(&maintenance_ops_);
+  STLDeleteElements(&maintenance_ops);
 }
 
 size_t TabletReplica::OnDiskSize() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5523b0a3/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 5cc7c9f..27829ba 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -265,10 +265,9 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
 
   // Register the maintenance ops associated with this peer's tablet, also invokes
   // Tablet::RegisterMaintenanceOps().
-  void RegisterMaintenanceOps(MaintenanceManager* maintenance_manager);
+  void RegisterMaintenanceOps(MaintenanceManager* maint_mgr);
 
   // Unregister the maintenance ops associated with this replica's tablet.
-  // This method is not thread safe.
   void UnregisterMaintenanceOps();
 
   // Cancels the maintenance ops associated with this replica's tablet.
@@ -337,8 +336,8 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   std::shared_ptr<consensus::RaftConsensus> consensus_;
   simple_spinlock prepare_replicate_lock_;
 
-  // Lock protecting state_, last_status_, as well as smart pointers to collaborating
-  // classes such as tablet_ and consensus_.
+  // Lock protecting state_, last_status_, as well as pointers to collaborating
+  // classes such as tablet_, consensus_, and maintenance_ops_.
   mutable simple_spinlock lock_;
 
   // The human-readable last status of the tablet, displayed on the web page, command line


[6/6] kudu git commit: KUDU-1097 (patch 1): Make leader report config member health to master

Posted by mp...@apache.org.
KUDU-1097 (patch 1): Make leader report config member health to master

Passes existing tests (when the feature is disabled).

Tests will be added in a follow-up patch.

Change-Id: Ia5081cbe0c0d81733a781d4729211dd0c530cdfa
Reviewed-on: http://gerrit.cloudera.org:8080/8630
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/88e39bad
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/88e39bad
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/88e39bad

Branch: refs/heads/master
Commit: 88e39bad14cfab17882d72b69d3382c219b93c23
Parents: b436845
Author: Mike Percy <mp...@apache.org>
Authored: Tue Nov 21 20:44:18 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Nov 27 06:13:55 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_queue.cc           | 175 +++++++++++++++----
 src/kudu/consensus/consensus_queue.h            |  28 ++-
 src/kudu/consensus/raft_consensus.cc            |  34 +++-
 src/kudu/consensus/raft_consensus.h             |  18 +-
 .../integration-tests/raft_consensus-itest.cc   |   3 +-
 src/kudu/tserver/ts_tablet_manager.cc           |   8 +-
 6 files changed, 224 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/88e39bad/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 0bf75a7..b3897f7 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -48,6 +48,7 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/threadpool.h"
 #include "kudu/util/url-coding.h"
 
@@ -70,12 +71,14 @@ TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe);
 
 DECLARE_int32(consensus_rpc_timeout_ms);
 DECLARE_bool(safe_time_advancement_without_writes);
+DECLARE_bool(raft_prepare_replacement_before_eviction);
 
 using kudu::log::Log;
 using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
 using std::string;
 using std::unique_ptr;
+using std::unordered_map;
 using std::vector;
 using strings::Substitute;
 
@@ -244,6 +247,25 @@ void PeerMessageQueue::UntrackPeer(const string& uuid) {
   }
 }
 
+unordered_map<string, HealthReportPB> PeerMessageQueue::ReportHealthOfPeers() const {
+  unordered_map<string, HealthReportPB> reports;
+  std::lock_guard<simple_spinlock> lock(queue_lock_);
+  for (const auto& entry : peers_map_) {
+    const string& peer_uuid = entry.first;
+    const TrackedPeer* peer = entry.second;
+    HealthReportPB report;
+    auto overall_health = peer->last_overall_health_status;
+    // We always consider the local peer (ourselves) to be healthy.
+    // TODO(mpercy): Is this always a safe assumption?
+    if (peer_uuid == local_peer_pb_.permanent_uuid()) {
+      overall_health = HealthReportPB::HEALTHY;
+    }
+    report.set_overall_health(overall_health);
+    reports.emplace(peer_uuid, std::move(report));
+  }
+  return reports;
+}
+
 void PeerMessageQueue::CheckPeersInActiveConfigIfLeaderUnlocked() const {
   DCHECK(queue_lock_.is_locked());
   if (queue_state_.mode != LEADER) return;
@@ -369,20 +391,20 @@ OpId PeerMessageQueue::GetNextOpId() const {
                   queue_state_.last_appended.index() + 1);
 }
 
-bool PeerMessageQueue::SafeToEvict(const string& evict_uuid) {
+bool PeerMessageQueue::SafeToEvictUnlocked(const string& evict_uuid) const {
+  DCHECK(queue_lock_.is_locked());
   auto now = MonoTime::Now();
 
-  std::lock_guard<simple_spinlock> lock(queue_lock_);
   int remaining_voters = 0;
   int remaining_viable_voters = 0;
 
   for (const auto& e : peers_map_) {
     const auto& uuid = e.first;
     const auto& peer = e.second;
-    if (!IsRaftConfigVoter(uuid, *queue_state_.active_config)) {
+    if (uuid == evict_uuid) {
       continue;
     }
-    if (uuid == evict_uuid) {
+    if (!IsRaftConfigVoter(uuid, *queue_state_.active_config)) {
       continue;
     }
     remaining_voters++;
@@ -429,6 +451,76 @@ bool PeerMessageQueue::SafeToEvict(const string& evict_uuid) {
   return true;
 }
 
+void PeerMessageQueue::UpdatePeerHealthUnlocked(TrackedPeer* peer) {
+  DCHECK(queue_lock_.is_locked());
+
+  auto overall_health_status = PeerHealthStatus(*peer);
+
+  // Prepare error messages for different conditions.
+  string error_msg;
+  if (overall_health_status == HealthReportPB::FAILED) {
+    if (peer->last_exchange_status == PeerStatus::TABLET_FAILED) {
+      error_msg = Substitute("The tablet replica hosted on peer $0 has failed", peer->uuid);
+    } else if (!peer->wal_catchup_possible) {
+      error_msg = Substitute("The logs necessary to catch up peer $0 have been "
+                             "garbage collected. The replica will never be able "
+                             "to catch up", peer->uuid);
+    } else {
+      error_msg = Substitute("Leader has been unable to successfully communicate "
+                             "with peer $0 for more than $1 seconds ($2)",
+                             peer->uuid,
+                             FLAGS_follower_unavailable_considered_failed_sec,
+                             (MonoTime::Now() - peer->last_communication_time).ToString());
+    }
+  }
+
+  bool changed = overall_health_status != peer->last_overall_health_status;
+  peer->last_overall_health_status = overall_health_status;
+
+  if (FLAGS_raft_prepare_replacement_before_eviction) {
+    if (changed) {
+      if (overall_health_status == HealthReportPB::FAILED) {
+        // Only log when the status changes to FAILED.
+        LOG_WITH_PREFIX_UNLOCKED(INFO) << error_msg;
+      }
+      // Only notify when there is a change.
+      NotifyObserversOfPeerHealthChange();
+    }
+  } else {
+    if (overall_health_status == HealthReportPB::FAILED &&
+        SafeToEvictUnlocked(peer->uuid)) {
+      NotifyObserversOfFailedFollower(peer->uuid, queue_state_.current_term, error_msg);
+    }
+  }
+}
+
+HealthReportPB::HealthStatus PeerMessageQueue::PeerHealthStatus(const TrackedPeer& peer) {
+  // Unreachable peers are considered failed.
+  auto max_unreachable = MonoDelta::FromSeconds(FLAGS_follower_unavailable_considered_failed_sec);
+  if (MonoTime::Now() - peer.last_communication_time > max_unreachable) {
+    return HealthReportPB::FAILED;
+  }
+
+  // Replicas returning TABLET_FAILED status are considered to have FAILED health.
+  if (peer.last_exchange_status == PeerStatus::TABLET_FAILED) {
+    return HealthReportPB::FAILED;
+  }
+
+  // If we have never connected to this peer before, and we have not exceeded
+  // the unreachable timeout, its health is unknown.
+  if (peer.last_exchange_status == PeerStatus::NEW) {
+    return HealthReportPB::UNKNOWN;
+  }
+
+  // Tablets that have fallen behind the leader's retained WAL are considered failed.
+  if (!peer.wal_catchup_possible) {
+    return HealthReportPB::FAILED;
+  }
+
+  // All other cases are considered healthy.
+  return HealthReportPB::HEALTHY;
+}
+
 Status PeerMessageQueue::RequestForPeer(const string& uuid,
                                         ConsensusRequestPB* request,
                                         vector<ReplicateRefPtr>* msg_refs,
@@ -436,18 +528,18 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
   // Maintain a thread-safe copy of necessary members.
   OpId preceding_id;
   int64_t current_term;
-  TrackedPeer peer;
+  TrackedPeer peer_copy;
   MonoDelta unreachable_time;
   {
     std::lock_guard<simple_spinlock> lock(queue_lock_);
     DCHECK_EQ(queue_state_.state, kQueueOpen);
     DCHECK_NE(uuid, local_peer_pb_.permanent_uuid());
 
-    TrackedPeer* peer_ptr = FindPtrOrNull(peers_map_, uuid);
-    if (PREDICT_FALSE(peer_ptr == nullptr || queue_state_.mode == NON_LEADER)) {
+    TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
+    if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
       return Status::NotFound("Peer not tracked or queue not in leader mode.");
     }
-    peer = *peer_ptr;
+    peer_copy = *peer;
 
     // Clear the requests without deleting the entries, as they may be in use by other peers.
     request->mutable_ops()->ExtractSubrange(0, request->ops_size(), nullptr);
@@ -461,19 +553,22 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
     request->set_all_replicated_index(queue_state_.all_replicated_index);
     request->set_last_idx_appended_to_leader(queue_state_.last_appended.index());
     request->set_caller_term(current_term);
-    unreachable_time = MonoTime::Now() - peer.last_communication_time;
-  }
-  if (unreachable_time.ToSeconds() > FLAGS_follower_unavailable_considered_failed_sec) {
-    if (SafeToEvict(uuid)) {
-      string msg = Substitute("Leader has been unable to successfully communicate "
-                              "with Peer $0 for more than $1 seconds ($2)",
-                              uuid,
-                              FLAGS_follower_unavailable_considered_failed_sec,
-                              unreachable_time.ToString());
-      NotifyObserversOfFailedFollower(uuid, current_term, msg);
-    }
+    unreachable_time = MonoTime::Now() - peer_copy.last_communication_time;
   }
-  if (peer.last_exchange_status == PeerStatus::TABLET_NOT_FOUND) {
+
+  // Always trigger a health status update check at the end of this function.
+  bool wal_catchup_progress = false;
+  bool wal_catchup_failure = false;
+  SCOPED_CLEANUP({
+      std::lock_guard<simple_spinlock> lock(queue_lock_);
+      TrackedPeer* peer = FindPtrOrNull(peers_map_, uuid);
+      if (!peer) return;
+      if (wal_catchup_progress) peer->wal_catchup_possible = true;
+      if (wal_catchup_failure) peer->wal_catchup_possible = false;
+      UpdatePeerHealthUnlocked(peer);
+    });
+
+  if (peer_copy.last_exchange_status == PeerStatus::TABLET_NOT_FOUND) {
     VLOG(3) << LogPrefixUnlocked() << "Peer " << uuid << " needs tablet copy" << THROTTLE_MSG;
     *needs_tablet_copy = true;
     return Status::OK();
@@ -483,14 +578,14 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
   // If we've never communicated with the peer, we don't know what messages to
   // send, so we'll send a status-only request. Otherwise, we grab requests
   // from the log starting at the last_received point.
-  if (peer.last_exchange_status != PeerStatus::NEW) {
+  if (peer_copy.last_exchange_status != PeerStatus::NEW) {
 
     // The batch of messages to send to the peer.
     vector<ReplicateRefPtr> messages;
     int max_batch_size = FLAGS_consensus_max_batch_size_bytes - request->ByteSize();
 
     // We try to get the follower's next_index from our log.
-    Status s = log_cache_.ReadOps(peer.next_index - 1,
+    Status s = log_cache_.ReadOps(peer_copy.next_index - 1,
                                   max_batch_size,
                                   &messages,
                                   &preceding_id);
@@ -501,7 +596,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
         string msg = Substitute("The logs necessary to catch up peer $0 have been "
                                 "garbage collected. The follower will never be able "
                                 "to catch up ($1)", uuid, s.ToString());
-        NotifyObserversOfFailedFollower(uuid, current_term, msg);
+        wal_catchup_failure = true;
         return s;
       // IsIncomplete() means that we tried to read beyond the head of the log
       // (in the future). See KUDU-1078.
@@ -510,14 +605,18 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
         LOG_WITH_PREFIX_UNLOCKED(ERROR) << "Error trying to read ahead of the log "
                                         << "while preparing peer request: "
                                         << s.ToString() << ". Destination peer: "
-                                        << peer.ToString();
+                                        << peer_copy.ToString();
         return s;
       }
       LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Error reading the log while preparing peer request: "
                                       << s.ToString() << ". Destination peer: "
-                                      << peer.ToString();
+                                      << peer_copy.ToString();
     }
 
+    // Since we were able to read ops through the log cache, we know that
+    // catchup is possible.
+    wal_catchup_progress = true;
+
     // We use AddAllocated rather than copy, because we pin the log cache at the
     // "all replicated" point. At some point we may want to allow partially loading
     // (and not pinning) earlier messages. At that point we'll need to do something
@@ -537,7 +636,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
   if (request->ops_size() > 0) {
     int64_t last_op_sent = request->ops(request->ops_size() - 1).id().index();
     if (last_op_sent < request->committed_index()) {
-      KLOG_EVERY_N_SECS_THROTTLER(INFO, 3, peer.status_log_throttler, "lagging")
+      KLOG_EVERY_N_SECS_THROTTLER(INFO, 3, peer_copy.status_log_throttler, "lagging")
           << LogPrefixUnlocked() << "Peer " << uuid << " is lagging by at least "
           << (request->committed_index() - last_op_sent)
           << " ops behind the committed index " << THROTTLE_MSG;
@@ -719,11 +818,7 @@ void PeerMessageQueue::UpdatePeerStatus(const string& peer_uuid,
       break;
 
     case PeerStatus::TABLET_FAILED: {
-      // Use the current term to ensure the peer will be evicted, otherwise this
-      // notification may be ignored.
-      int64_t current_term = queue_state_.current_term;
-      l.unlock();
-      NotifyObserversOfFailedFollower(peer_uuid, current_term, status.ToString());
+      UpdatePeerHealthUnlocked(peer);
       return;
     }
 
@@ -1150,6 +1245,24 @@ void PeerMessageQueue::NotifyObserversOfFailedFollowerTask(const string& uuid,
   }
 }
 
+void PeerMessageQueue::NotifyObserversOfPeerHealthChange() {
+  WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
+      Bind(&PeerMessageQueue::NotifyObserversOfPeerHealthChangeTask, Unretained(this))),
+              LogPrefixUnlocked() + "Unable to notify RaftConsensus peer health change.");
+}
+
+void PeerMessageQueue::NotifyObserversOfPeerHealthChangeTask() {
+  MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications);
+  std::vector<PeerMessageQueueObserver*> observers_copy;
+  {
+    std::lock_guard<simple_spinlock> lock(queue_lock_);
+    observers_copy = observers_;
+  }
+  for (PeerMessageQueueObserver* observer : observers_copy) {
+    observer->NotifyPeerHealthChange();
+  }
+}
+
 PeerMessageQueue::~PeerMessageQueue() {
   Close();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/88e39bad/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index c3cf620..871089b 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -120,6 +120,8 @@ class PeerMessageQueue {
           last_known_committed_index(MinimumOpId().index()),
           last_exchange_status(PeerStatus::NEW),
           last_communication_time(MonoTime::Now()),
+          wal_catchup_possible(true),
+          last_overall_health_status(HealthReportPB::UNKNOWN),
           last_seen_term_(0) {}
 
     TrackedPeer() = default;
@@ -165,6 +167,13 @@ class PeerMessageQueue {
     // successful communication ever took place.
     MonoTime last_communication_time;
 
+    // Set to false if it is determined that the remote peer has fallen behind
+    // the local peer's WAL.
+    bool wal_catchup_possible;
+
+    // The peer's latest overall health status.
+    HealthReportPB::HealthStatus last_overall_health_status;
+
     // Throttler for how often we will log status messages pertaining to this
     // peer (eg when it is lagging, etc).
     logging::LogThrottler status_log_throttler;
@@ -211,6 +220,10 @@ class PeerMessageQueue {
   // Makes the queue untrack this peer.
   void UntrackPeer(const std::string& uuid);
 
+  // Returns a health report for all active peers.
+  // Returns IllegalState if the local peer is not the leader of the config.
+  std::unordered_map<std::string, HealthReportPB> ReportHealthOfPeers() const;
+
   // Appends a single message to be replicated to the peers.
   // Returns OK unless the message could not be added to the queue for some
   // reason (e.g. the queue reached max size).
@@ -419,7 +432,14 @@ class PeerMessageQueue {
 
   // Return true if it would be safe to evict the peer 'evict_uuid' at this
   // point in time.
-  bool SafeToEvict(const std::string& evict_uuid);
+  bool SafeToEvictUnlocked(const std::string& evict_uuid) const;
+
+  // Update a peer's last_health_status field and trigger the appropriate
+  // notifications.
+  void UpdatePeerHealthUnlocked(TrackedPeer* peer);
+
+  // Calculate a peer's up-to-date health status based on internal fields.
+  static HealthReportPB::HealthStatus PeerHealthStatus(const TrackedPeer& peer);
 
   void NotifyObserversOfCommitIndexChange(int64_t new_commit_index);
   void NotifyObserversOfCommitIndexChangeTask(int64_t new_commit_index);
@@ -434,6 +454,9 @@ class PeerMessageQueue {
                                            int64_t term,
                                            const std::string& reason);
 
+  void NotifyObserversOfPeerHealthChange();
+  void NotifyObserversOfPeerHealthChangeTask();
+
   typedef std::unordered_map<std::string, TrackedPeer*> PeersMap;
 
   std::string ToStringUnlocked() const;
@@ -520,6 +543,9 @@ class PeerMessageQueueObserver {
                                     int64_t term,
                                     const std::string& reason) = 0;
 
+  // Notify the observer that the health of one of the peers has changed.
+  virtual void NotifyPeerHealthChange() = 0;
+
   virtual ~PeerMessageQueueObserver() {}
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/88e39bad/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 58cc087..45066bb 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -24,8 +24,9 @@
 #include <memory>
 #include <mutex>
 #include <ostream>
-#include <unordered_set>
 #include <type_traits>
+#include <unordered_map>
+#include <unordered_set>
 
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
@@ -778,6 +779,10 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
               LogPrefixThreadSafe() + "Unable to start RemoteFollowerTask");
 }
 
+void RaftConsensus::NotifyPeerHealthChange() {
+  MarkDirty("Peer health change");
+}
+
 void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
                                           const RaftConfigPB& committed_config,
                                           const std::string& reason) {
@@ -2166,10 +2171,31 @@ const string& RaftConsensus::tablet_id() const {
   return options_.tablet_id;
 }
 
-ConsensusStatePB RaftConsensus::ConsensusState() const {
+ConsensusStatePB RaftConsensus::ConsensusState(IncludeHealthReport report_health) const {
   ThreadRestrictions::AssertWaitAllowed();
-  LockGuard l(lock_);
-  return cmeta_->ToConsensusStatePB();
+  UniqueLock l(lock_);
+  ConsensusStatePB cstate = cmeta_->ToConsensusStatePB();
+
+  // If we need to include the health report, merge it into the committed
+  // config iff we believe we are the current leader of the config.
+  if (report_health == INCLUDE_HEALTH_REPORT &&
+      cmeta_->active_role() == RaftPeerPB::LEADER) {
+    auto reports = queue_->ReportHealthOfPeers();
+
+    // We don't need to access the queue anymore, so drop the consensus lock.
+    l.unlock();
+
+    // Iterate through each peer in the committed config and attach the health
+    // report to it.
+    RaftConfigPB* committed_raft_config = cstate.mutable_committed_config();
+    for (int i = 0; i < committed_raft_config->peers_size(); i++) {
+      RaftPeerPB* peer = committed_raft_config->mutable_peers(i);
+      const HealthReportPB* report = FindOrNull(reports, peer->permanent_uuid());
+      if (!report) continue; // Only attach details if we know about the peer.
+      *peer->mutable_health_report() = *report;
+    }
+  }
+  return cstate;
 }
 
 RaftConfigPB RaftConsensus::CommittedConfig() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/88e39bad/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 7229bb7..fb43e10 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -277,8 +277,16 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   scoped_refptr<TimeManager> time_manager() const { return time_manager_; }
 
+  enum IncludeHealthReport {
+    EXCLUDE_HEALTH_REPORT,
+    INCLUDE_HEALTH_REPORT
+  };
+
   // Returns a copy of the state of the consensus system.
-  ConsensusStatePB ConsensusState() const;
+  // If 'report_health' is set to 'INCLUDE_HEALTH_REPORT', and if the
+  // local replica believes it is the leader of the config, it will include a
+  // health report about each active peer in the committed config.
+  ConsensusStatePB ConsensusState(IncludeHealthReport report_health = EXCLUDE_HEALTH_REPORT) const;
 
   // Returns a copy of the current committed Raft configuration.
   RaftConfigPB CommittedConfig() const;
@@ -309,13 +317,15 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // Updates the committed_index and triggers the Apply()s for whatever
   // transactions were pending.
   // This is idempotent.
-  void NotifyCommitIndex(int64_t commit_index);
+  void NotifyCommitIndex(int64_t commit_index) override;
 
-  void NotifyTermChange(int64_t term);
+  void NotifyTermChange(int64_t term) override;
 
   void NotifyFailedFollower(const std::string& uuid,
                             int64_t term,
-                            const std::string& reason);
+                            const std::string& reason) override;
+
+  void NotifyPeerHealthChange() override;
 
   // Return the log indexes which the consensus implementation would like to retain.
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/88e39bad/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index e41a331..c31d7eb 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -2424,7 +2424,8 @@ TEST_F(RaftConsensusITest, TestUpdateConsensusErrorNonePrepared) {
 TEST_F(RaftConsensusITest, TestCorruptReplicaMetadata) {
   // Start cluster and wait until we have a stable leader.
   // Switch off tombstoning of evicted replicas to observe the failed tablet state.
-  NO_FATALS(BuildAndStart({}, { "--master_tombstone_evicted_tablet_replicas=false" }));
+  NO_FATALS(BuildAndStart({ "--consensus_rpc_timeout_ms=10000" }, // Ensure we are safe to evict.
+                          { "--master_tombstone_evicted_tablet_replicas=false" }));
   ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_,
                                   tablet_id_, 1));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/88e39bad/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 9e234ec..bef2d9d 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -29,6 +29,7 @@
 #include <boost/bind.hpp> // IWYU pragma: keep
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/clock/clock.h"
@@ -118,6 +119,8 @@ DEFINE_int32(tablet_state_walk_min_period_ms, 1000,
              "tablet map to update tablet state counts.");
 TAG_FLAG(tablet_state_walk_min_period_ms, advanced);
 
+DECLARE_bool(raft_prepare_replacement_before_eviction);
+
 METRIC_DEFINE_gauge_int32(server, tablets_num_not_initialized,
                           "Number of Not Initialized Tablets",
                           kudu::MetricUnit::kTablets,
@@ -1153,7 +1156,10 @@ void TSTabletManager::CreateReportedTabletPB(const scoped_refptr<TabletReplica>&
   // We cannot get consensus state information unless the TabletReplica is running.
   shared_ptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
   if (consensus) {
-    *reported_tablet->mutable_consensus_state() = consensus->ConsensusState();
+    auto include_health = FLAGS_raft_prepare_replacement_before_eviction ?
+                          RaftConsensus::INCLUDE_HEALTH_REPORT :
+                          RaftConsensus::EXCLUDE_HEALTH_REPORT;
+    *reported_tablet->mutable_consensus_state() = consensus->ConsensusState(include_health);
   }
 }
 


[5/6] kudu git commit: shutdown tablets on disk failure at runtime

Posted by mp...@apache.org.
shutdown tablets on disk failure at runtime

Before, various code paths pass along disk failure Statuses until they
eventually hit a CHECK failure and crash the server. Such fatal errors
were "safe" by design, as they would ensure no additional changes were
made durable to each tablet. This patch aims to achieve similar behavior
for failed replicas while keeping the server alive.

These failures are permitted provided the following have occurred for
each tablet in the affected directory:
- The failed directory is immediately marked as failed, preventing
  further tablets from being striped across a failed disk.
- The tablet's MvccManager is shut down to prevent further writes from
  being made durable and preventing I/O to the tablet.
- A request is submitted to a threadpool to eventually completely shut
  down the replica, leaving it for eviction.

NOTE: failures of metadata file and the WAL directory are fatal. Code
paths that update these explicitly crash the server.

This is a part of a series of patches to handle disk failure. To see how
this patch fits in, see section 3 of:
https://docs.google.com/document/d/1yGVzDzV14mKReZ7EzlZZV_KfDBRnHJkRtlDox_RPXAA/edit

Change-Id: I109635a54268b9db741b2ae9ea3e9f1fe072d0a8
Reviewed-on: http://gerrit.cloudera.org:8080/7442
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b4368459
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b4368459
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b4368459

Branch: refs/heads/master
Commit: b436845989d7519fe473ca3579d3385c4a2bdaf3
Parents: 5523b0a
Author: Andrew Wong <aw...@cloudera.com>
Authored: Wed Nov 22 16:02:06 2017 -0800
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Thu Nov 23 04:41:55 2017 +0000

----------------------------------------------------------------------
 src/kudu/tablet/tablet.cc                   |   6 +-
 src/kudu/tablet/tablet.h                    |   6 +
 src/kudu/tablet/tablet_replica.cc           |  22 +++-
 src/kudu/tablet/tablet_replica.h            |   3 +
 src/kudu/tserver/tablet_copy_client-test.cc |   5 +-
 src/kudu/tserver/ts_tablet_manager.cc       | 139 +++++++++++++++++------
 src/kudu/tserver/ts_tablet_manager.h        |  13 +++
 7 files changed, 152 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b4368459/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 887bb7e..5e0c805 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1065,10 +1065,7 @@ Status Tablet::Flush() {
 
 Status Tablet::FlushUnlocked() {
   TRACE_EVENT0("tablet", "Tablet::FlushUnlocked");
-  {
-    std::lock_guard<simple_spinlock> l(state_lock_);
-    RETURN_NOT_OK(CheckHasNotBeenStoppedUnlocked());
-  }
+  RETURN_NOT_OK(CheckHasNotBeenStopped());
   RowSetsInCompaction input;
   shared_ptr<MemRowSet> old_mrs;
   {
@@ -2256,6 +2253,7 @@ Tablet::Iterator::Iterator(const Tablet* tablet, const Schema& projection,
 Tablet::Iterator::~Iterator() {}
 
 Status Tablet::Iterator::Init(ScanSpec *spec) {
+  RETURN_NOT_OK(tablet_->CheckHasNotBeenStopped());
   DCHECK(iter_.get() == nullptr);
 
   RETURN_NOT_OK(tablet_->GetMappedReadProjection(projection_, &projection_));

http://git-wip-us.apache.org/repos/asf/kudu/blob/b4368459/src/kudu/tablet/tablet.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 15bca73..38638c0 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -482,6 +482,12 @@ class Tablet {
   // Must be called while 'state_lock_' is held.
   Status CheckHasNotBeenStoppedUnlocked() const;
 
+  // Returns an error if the tablet is in the 'kStopped' or 'kShutdown' state.
+  Status CheckHasNotBeenStopped() const {
+    std::lock_guard<simple_spinlock> l(state_lock_);
+    return CheckHasNotBeenStoppedUnlocked();
+  }
+
   Status FlushUnlocked();
 
   // Validate the contents of 'op' and return a bad Status if it is invalid.

http://git-wip-us.apache.org/repos/asf/kudu/blob/b4368459/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index d44e6b4..537f728 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -224,8 +224,12 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
         metric_entity,
         mark_dirty_clbk_));
 
-    // Re-acquire 'lock_' to update our state variable.
     std::lock_guard<simple_spinlock> l(lock_);
+
+    // If an error has been set (e.g. due to a disk failure from a separate
+    // thread), error out.
+    RETURN_NOT_OK(error_);
+
     CHECK_EQ(BOOTSTRAPPING, state_); // We are still protected by 'state_change_lock_'.
     set_state(RUNNING);
   }
@@ -737,6 +741,22 @@ size_t TabletReplica::OnDiskSize() const {
   return ret;
 }
 
+void TabletReplica::MakeUnavailable(const Status& error) {
+  std::shared_ptr<Tablet> tablet;
+  {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    tablet = tablet_;
+    for (MaintenanceOp* op : maintenance_ops_) {
+      op->CancelAndDisable();
+    }
+  }
+  // Stop the Tablet from doing further I/O.
+  if (tablet) tablet->Stop();
+
+  // Set the error; when the replica is shut down, it will end up FAILED.
+  SetError(error);
+}
+
 Status FlushInflightsToLogCallback::WaitForInflightsAndFlushLog() {
   // This callback is triggered prior to any TabletMetadata flush.
   // The guarantee that we are trying to enforce is this:

http://git-wip-us.apache.org/repos/asf/kudu/blob/b4368459/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 27829ba..bf3476d 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -274,6 +274,9 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   // Only to be used in tests.
   void CancelMaintenanceOpsForTests();
 
+  // Stops further I/O on the replica.
+  void MakeUnavailable(const Status& error);
+
   // Return pointer to the transaction tracker for this peer.
   const TransactionTracker* transaction_tracker() const { return &txn_tracker_; }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/b4368459/src/kudu/tserver/tablet_copy_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
index 277d8ff..7ac12cd 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -293,10 +293,11 @@ TEST_F(TabletCopyClientTest, TestFailedDiskStopsClient) {
     }
   });
 
-  // In a separate thread, mark one of the directories as failed.
+  // In a separate thread, mark one of the directories as failed (not the
+  // metadata directory).
   while (true) {
     if (rand() % 10 == 0) {
-      dd_manager->MarkDataDirFailed(0, "injected failure in non-client thread");
+      dd_manager->MarkDataDirFailed(1, "injected failure in non-client thread");
       LOG(INFO) << "INJECTING FAILURE";
       break;
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/b4368459/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 47eae78..9e234ec 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <mutex>
 #include <ostream>
+#include <set>
 #include <string>
 #include <utility>
 #include <vector>
@@ -52,6 +53,7 @@
 #include "kudu/master/master.pb.h"
 #include "kudu/rpc/result_tracker.h"
 #include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_bootstrap.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
@@ -156,6 +158,7 @@ METRIC_DEFINE_gauge_int32(server, tablets_num_shutdown,
                           kudu::MetricUnit::kTablets,
                           "Number of tablets currently shut down");
 
+using std::set;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -163,10 +166,6 @@ using strings::Substitute;
 
 namespace kudu {
 
-namespace tablet {
-class Tablet;
-}
-
 using consensus::ConsensusMetadata;
 using consensus::ConsensusMetadataCreateMode;
 using consensus::ConsensusMetadataManager;
@@ -737,6 +736,35 @@ Status TSTabletManager::CreateAndRegisterTabletReplica(
   return Status::OK();
 }
 
+Status TSTabletManager::BeginReplicaStateTransition(
+    const string& tablet_id,
+    const string& reason,
+    scoped_refptr<TabletReplica>* replica,
+    scoped_refptr<TransitionInProgressDeleter>* deleter,
+    TabletServerErrorPB::Code* error_code) {
+  // Acquire the lock in exclusive mode as we'll add a entry to the
+  // transition_in_progress_ map.
+  std::lock_guard<RWMutex> lock(lock_);
+  TRACE("Acquired tablet manager lock");
+  RETURN_NOT_OK(CheckRunningUnlocked(error_code));
+
+  if (!LookupTabletUnlocked(tablet_id, replica)) {
+    if (error_code) {
+      *error_code = TabletServerErrorPB::TABLET_NOT_FOUND;
+    }
+    return Status::NotFound("Tablet not found", tablet_id);
+  }
+  // Sanity check that the tablet's transition isn't already in progress
+  Status s = StartTabletStateTransitionUnlocked(tablet_id, reason, deleter);
+  if (PREDICT_FALSE(!s.ok())) {
+    if (error_code) {
+      *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
+    }
+    return s;
+  }
+  return Status::OK();
+}
+
 Status TSTabletManager::DeleteTablet(
     const string& tablet_id,
     TabletDataState delete_type,
@@ -754,31 +782,11 @@ Status TSTabletManager::DeleteTablet(
 
   scoped_refptr<TabletReplica> replica;
   scoped_refptr<TransitionInProgressDeleter> deleter;
-  {
-    // Acquire the lock in exclusive mode as we'll add a entry to the
-    // transition_in_progress_ map.
-    std::lock_guard<RWMutex> lock(lock_);
-    TRACE("Acquired tablet manager lock");
-    RETURN_NOT_OK(CheckRunningUnlocked(error_code));
+  RETURN_NOT_OK(BeginReplicaStateTransition(tablet_id, "deleting tablet", &replica,
+                                            &deleter, error_code));
 
-    if (!LookupTabletUnlocked(tablet_id, &replica)) {
-      if (error_code) {
-        *error_code = TabletServerErrorPB::TABLET_NOT_FOUND;
-      }
-      return Status::NotFound("Tablet not found", tablet_id);
-    }
-    // Sanity check that the tablet's deletion isn't already in progress
-    Status s = StartTabletStateTransitionUnlocked(tablet_id, "deleting tablet", &deleter);
-    if (PREDICT_FALSE(!s.ok())) {
-      if (error_code) {
-        *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
-      }
-      return s;
-    }
-  }
-
-  // If the tablet is already deleted, the CAS check isn't possible because
-  // consensus and therefore the log is not available.
+  // If the tablet has been deleted or forcefully shut down, the CAS check
+  // isn't possible because consensus and therefore the log is not available.
   TabletDataState data_state = replica->tablet_metadata()->tablet_data_state();
   bool tablet_already_deleted = (data_state == TABLET_DATA_DELETED ||
                                  data_state == TABLET_DATA_TOMBSTONED);
@@ -881,7 +889,7 @@ Status TSTabletManager::StartTabletStateTransitionUnlocked(
   }
 
   if (!InsertIfNotPresent(&transition_in_progress_, tablet_id, reason)) {
-    return Status::IllegalState(
+    return Status::AlreadyPresent(
         Substitute("State transition of tablet $0 already in progress: $1",
                     tablet_id, transition_in_progress_[tablet_id]));
   }
@@ -1259,7 +1267,7 @@ Status TSTabletManager::DeleteTabletData(
   CHECK(delete_type == TABLET_DATA_DELETED ||
         delete_type == TABLET_DATA_TOMBSTONED ||
         delete_type == TABLET_DATA_COPYING)
-      << "Unexpected delete_type to delete tablet " << meta->tablet_id() << ": "
+      << "Unexpected delete_type to delete tablet " << tablet_id << ": "
       << TabletDataState_Name(delete_type) << " (" << delete_type << ")";
 
   // Note: Passing an unset 'last_logged_opid' will retain the last_logged_opid
@@ -1271,7 +1279,7 @@ Status TSTabletManager::DeleteTabletData(
             << (last_logged_opid ? OpIdToString(*last_logged_opid) : "(unknown)");
   MAYBE_FAULT(FLAGS_fault_crash_after_blocks_deleted);
 
-  RETURN_NOT_OK(Log::DeleteOnDiskData(meta->fs_manager(), meta->tablet_id()));
+  CHECK_OK(Log::DeleteOnDiskData(meta->fs_manager(), tablet_id));
   MAYBE_FAULT(FLAGS_fault_crash_after_wal_deleted);
 
   // We do not delete the superblock or the consensus metadata when tombstoning
@@ -1285,13 +1293,25 @@ Status TSTabletManager::DeleteTabletData(
   DCHECK_EQ(TABLET_DATA_DELETED, delete_type);
 
   LOG(INFO) << LogPrefix(tablet_id, meta->fs_manager()) << "Deleting consensus metadata";
-  Status s = cmeta_manager->Delete(meta->tablet_id());
+  Status s = cmeta_manager->Delete(tablet_id);
   // NotFound means we already deleted the cmeta in a previous attempt.
   if (PREDICT_FALSE(!s.ok() && !s.IsNotFound())) {
+    if (s.IsDiskFailure()) {
+      LOG(FATAL) << LogPrefix(tablet_id, meta->fs_manager())
+                 << "consensus metadata is on a failed disk";
+    }
     return s;
   }
   MAYBE_FAULT(FLAGS_fault_crash_after_cmeta_deleted);
-  return meta->DeleteSuperBlock();
+  s = meta->DeleteSuperBlock();
+  if (PREDICT_FALSE(!s.ok())) {
+    if (s.IsDiskFailure()) {
+      LOG(FATAL) << LogPrefix(tablet_id, meta->fs_manager())
+                 << "tablet metadata is on a failed disk";
+    }
+    return s;
+  }
+  return Status::OK();
 }
 
 void TSTabletManager::FailTabletsInDataDir(const string& uuid) {
@@ -1299,8 +1319,57 @@ void TSTabletManager::FailTabletsInDataDir(const string& uuid) {
   int uuid_idx;
   CHECK(dd_manager->FindUuidIndexByUuid(uuid, &uuid_idx))
       << Substitute("No data directory found with UUID $0", uuid);
-  LOG(FATAL) << Substitute("Data directory $0 failed. Disk failure handling not implemented",
-                           dd_manager->FindDataDirByUuidIndex(uuid_idx)->dir());
+  if (fs_manager_->dd_manager()->IsDataDirFailed(uuid_idx)) {
+    LOG(WARNING) << "Data directory is already marked failed.";
+    return;
+  }
+  // Fail the directory to prevent other tablets from being placed in it.
+  dd_manager->MarkDataDirFailed(uuid_idx);
+  set<string> tablets = dd_manager->FindTabletsByDataDirUuidIdx(uuid_idx);
+  LOG(INFO) << Substitute("Data dir $0 has $1 tablets", uuid, tablets.size());
+  for (const string& tablet_id : dd_manager->FindTabletsByDataDirUuidIdx(uuid_idx)) {
+    FailTabletAndScheduleShutdown(tablet_id);
+  }
+}
+
+void TSTabletManager::FailTabletAndScheduleShutdown(const string& tablet_id) {
+  LOG(INFO) << LogPrefix(tablet_id, fs_manager_) << "failing tablet";
+  scoped_refptr<TabletReplica> replica;
+  if (LookupTablet(tablet_id, &replica)) {
+    // Stop further IO to the replica and set an error in the replica.
+    // When the replica is shutdown, this will leave it in a FAILED state.
+    replica->MakeUnavailable(Status::IOError("failing tablet"));
+
+    // Submit a request to actually shut down the tablet asynchronously.
+    CHECK_OK(open_tablet_pool_->SubmitFunc([tablet_id, this]() {
+      scoped_refptr<TabletReplica> replica;
+      scoped_refptr<TransitionInProgressDeleter> deleter;
+      TabletServerErrorPB::Code error;
+      Status s;
+      // Transition tablet state to ensure nothing else (e.g. tablet copies,
+      // deletions, etc) happens concurrently.
+      while (true) {
+        s = BeginReplicaStateTransition(tablet_id, "failing tablet",
+                                        &replica, &deleter, &error);
+        if (!s.IsAlreadyPresent()) {
+          break;
+        }
+        SleepFor(MonoDelta::FromMilliseconds(10));
+      }
+      // Success: we started the transition.
+      //
+      // Only proceed if there is no Tablet (e.g. a bootstrap terminated early
+      // due to error before creating the Tablet) or if the tablet has been
+      // stopped (e.g. due to the above call to MakeUnavailable).
+      std::shared_ptr<Tablet> tablet = replica->shared_tablet();
+      if (s.ok() && (!tablet || tablet->HasBeenStopped())) {
+        replica->Shutdown();
+      }
+      // Else: the tablet is healthy, or is already either not running or
+      // deleted (e.g. because another thread was able to successfully create a
+      // new replica).
+    }));
+  }
 }
 
 int TSTabletManager::RefreshTabletStateCacheAndReturnCount(tablet::TabletStatePB st) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/b4368459/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 317ab9f..a02a13c 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -199,6 +199,10 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
       tablet::TabletDataState delete_type,
       boost::optional<consensus::OpId> last_logged_opid);
 
+  // Synchronously makes the specified tablet unavailable for further I/O and
+  // schedules its asynchronous shutdown.
+  void FailTabletAndScheduleShutdown(const std::string& tablet_id);
+
   // Forces shutdown of the tablet replicas in the data dir corresponding to 'uuid'.
   void FailTabletsInDataDir(const std::string& uuid);
 
@@ -236,6 +240,15 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
                                             const std::string& reason,
                                             scoped_refptr<TransitionInProgressDeleter>* deleter);
 
+  // Marks the replica indicated by 'tablet_id' as being in a transitional
+  // state. Returns an error status if the replica is already in a transitional
+  // state.
+  Status BeginReplicaStateTransition(const std::string& tablet_id,
+                                     const std::string& reason,
+                                     scoped_refptr<tablet::TabletReplica>* replica,
+                                     scoped_refptr<TransitionInProgressDeleter>* deleter,
+                                     TabletServerErrorPB::Code* error_code);
+
   // Open a tablet meta from the local file system by loading its superblock.
   Status OpenTabletMeta(const std::string& tablet_id,
                         scoped_refptr<tablet::TabletMetadata>* metadata);