You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/11/27 06:16:26 UTC

[2/6] kudu git commit: error_manager: synchronize/serialize handling

error_manager: synchronize/serialize handling

The FsErrorManager is helper infrastructure that other classes can use to
help provide API contracts related to error handling.

Here is an example error-handling contract provided to the TSTabletManager
in a future patch, if a disk failure Status is returned during tablet
operation, either:
1) the tablet server will crash and we can rely on Kudu's crash-consistency
   mechanisms for safety, or
2) any affected Tablet will transition to the 'kStopped' state via an error
   manager callback. Most components will simply pass the non-OK Status up
   the call chain. However, if a method expects an IO operation to
   succeed for correctness purposes, and it receives an error Status,
   then it should check that the Tablet is stopped. If so, it can be
   assumed that the error was handled. Otherwise, Kudu must rely on the
   crash-consistency mechanisms and crash.

Given the above contract, The state of a tablet server post-disk-failure
depends significantly on the completion of disk-failure-handling
callbacks. Error-handling _must_ finish before anything is propagated
back to the offending caller.

To ensure correct completion of error-handling even in a concurrent
setting, this patch extends the error manager with a mutex and a second
error-handling callback type. This ensures that errors indirectly caused
by disk failures can be handled by non-disk-specific handling,
serializing failure-handling in the same fashion.

As an example of where this is necessary, say a tablet has data in a
single directory and hits a bad disk. That directory is immediately
marked failed and handling starts to fail all tablets in the directory.
Before, if the tablet were to create a new block before being failed, it
would fail immediately, complaining that no directories are available,
and would eventually fail a CHECK that translates roughly to: "Has error
handling for this tablet completed?" By wrapping block creation with
tablet-specific error handling and with serialized error-handling, this
CHECK will pass.

A previous revision accomplished this by using a single-threaded
threadpool to trigger error-handling and ensuring completion of
error-handling by waiting on the threadpool. I ultimately went with this
implentation since it's a bit more straight-forward wrt when to make the
different calls (i.e. "trigger disk error-handling vs trigger tablet
error-handling", instead of "trigger error-handling vs wait for
error-handling to finish"). This also has the benefit of being
extendable for other kinds of errors in the future.

Change-Id: Ie61c408a0b4424f933f40a31147568c2f906be0e
Reviewed-on: http://gerrit.cloudera.org:8080/8395
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1c742470
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1c742470
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1c742470

Branch: refs/heads/master
Commit: 1c7424706a22ec93ab98086025e4154b815f32b3
Parents: 22987c7
Author: Andrew Wong <aw...@cloudera.com>
Authored: Wed Oct 25 14:15:11 2017 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Thu Nov 23 00:43:02 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/CMakeLists.txt            |   2 +
 src/kudu/fs/data_dirs-test.cc         |   8 +-
 src/kudu/fs/error_manager-test.cc     | 199 +++++++++++++++++++++++++++++
 src/kudu/fs/error_manager.cc          |  84 ++++++++++++
 src/kudu/fs/error_manager.h           |  70 ++++++----
 src/kudu/fs/file_block_manager.cc     |  21 ++-
 src/kudu/fs/fs_manager.cc             |  14 +-
 src/kudu/fs/fs_manager.h              |   6 +-
 src/kudu/fs/log_block_manager-test.cc |   4 +-
 src/kudu/fs/log_block_manager.cc      |  27 ++--
 src/kudu/tserver/tablet_server.cc     |   8 +-
 src/kudu/util/status.h                |  10 ++
 12 files changed, 392 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index 0074082..b4dda2b 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -31,6 +31,7 @@ add_library(kudu_fs
   block_manager_metrics.cc
   block_manager_util.cc
   data_dirs.cc
+  error_manager.cc
   file_block_manager.cc
   fs_manager.cc
   fs_report.cc
@@ -56,6 +57,7 @@ ADD_KUDU_TEST(block_manager-test)
 ADD_KUDU_TEST(block_manager_util-test)
 ADD_KUDU_TEST(block_manager-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(data_dirs-test)
+ADD_KUDU_TEST(error_manager-test)
 ADD_KUDU_TEST(fs_manager-test)
 if (NOT APPLE)
   # Will only pass on Linux.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/data_dirs-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/data_dirs-test.cc b/src/kudu/fs/data_dirs-test.cc
index e658157..d35facc 100644
--- a/src/kudu/fs/data_dirs-test.cc
+++ b/src/kudu/fs/data_dirs-test.cc
@@ -394,8 +394,8 @@ TEST_F(DataDirManagerTest, TestOpenWithFailedDirs) {
   for (const string& test_root : test_roots_) {
     ASSERT_OK(env_->CreateDir(test_root));
   }
-  ASSERT_OK(DataDirManager::CreateNewForTests(env_, test_roots_,
-      DataDirManagerOptions(), &dd_manager_));
+  ASSERT_OK(DataDirManager::CreateNewForTests(
+      env_, test_roots_, DataDirManagerOptions(), &dd_manager_));
 
   // Kill the first non-metadata directory.
   FLAGS_crash_on_eio = false;
@@ -441,8 +441,8 @@ TEST_F(TooManyDataDirManagerTest, TestTooManyInternedStrings) {
   for (const auto& r : test_roots_) {
     ASSERT_OK(env_->CreateDir(r));
   }
-  ASSERT_OK(DataDirManager::CreateNewForTests(
-      env_, test_roots_, DataDirManagerOptions(), &dd_manager_));
+  ASSERT_OK(DataDirManager::CreateNewForTests(env_, test_roots_,
+      DataDirManagerOptions(), &dd_manager_));
 }
 
 } // namespace fs

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/error_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager-test.cc b/src/kudu/fs/error_manager-test.cc
new file mode 100644
index 0000000..0c06f0f
--- /dev/null
+++ b/src/kudu/fs/error_manager-test.cc
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <map>
+#include <memory>
+#include <set>
+#include <sstream>
+#include <stdlib.h>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/fs/error_manager.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/bind_helpers.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/threading/thread_collision_warner.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_util.h"
+
+using std::map;
+using std::set;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+const int kVecSize = 10;
+
+namespace kudu {
+namespace fs {
+
+// These tests are designed to ensure callback serialization by using callbacks
+// that update a single vector.
+class FsErrorManagerTest : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    em_.reset(new FsErrorManager());
+    test_vec_.resize(kVecSize, -1);
+  }
+
+  // Returns a stringified version of the single vector that each thread is
+  // updating.
+  string test_vec_string() {
+    return JoinInts(test_vec_, " ");
+  }
+
+  // Sleeps for a random amount of time, up to 500ms.
+  void SleepForRand() {
+    SleepFor(MonoDelta::FromMilliseconds(rand() % 500));
+  }
+
+  // Returns the index of the first instance of 'k' in test_vec_.
+  int FindFirst(int k) {
+    int first = -1;
+    for (int i = 0; i < test_vec_.size(); i++) {
+      if (test_vec_[i] == k) {
+        first = i;
+        break;
+      }
+    }
+    return first;
+  }
+
+  // Writes i to the first available (-1) entry in test_vec_ after sleeping a
+  // random amount of time. If multiple calls to this are running at the same
+  // time, it is likely that some will write to the same entry.
+  //
+  // NOTE: this can be curried into an ErrorNotificationCb.
+  void SleepAndWriteFirstEmptyCb(int i, const string& /* s */) {
+    DFAKE_SCOPED_LOCK(fake_lock_);
+    int first_available = FindFirst(-1);
+    SleepForRand();
+    if (first_available == -1) {
+      LOG(INFO) << "No available entries!";
+      return;
+    }
+    test_vec_[first_available] = i;
+  }
+
+  // Returns a map between each unique value in test_vec_ and the indices
+  // within test_vec_ at which the value is located.
+  map<int, set<int>> GetPositions() {
+    map<int, set<int>> positions;
+    for (int i = 0; i < test_vec_.size(); i++) {
+      positions[test_vec_[i]].insert(i);
+    }
+    return positions;
+  }
+
+  FsErrorManager* em() const { return em_.get(); }
+
+ protected:
+  // The single vector that the error notification callbacks will all write to.
+  vector<int> test_vec_;
+
+ private:
+  unique_ptr<FsErrorManager> em_;
+
+  // Fake lock used to ensure threads don't run error-handling at the same time.
+  DFAKE_MUTEX(fake_lock_);
+};
+
+// Tests the basic functionality (i.e. registering, unregistering, calling
+// callbacks) of the error manager.
+TEST_F(FsErrorManagerTest, TestBasicRegistration) {
+  // Before registering anything, there should be all '-1's in test_vec_.
+  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::DISK));
+  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::TABLET));
+
+  // Register a callback to update the first '-1' entry in test_vec_ to '0'
+  // after waiting a random amount of time.
+  em()->SetErrorNotificationCb(ErrorHandlerType::DISK,
+      Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
+           Unretained(this), ErrorHandlerType::DISK));
+  em()->RunErrorNotificationCb(ErrorHandlerType::DISK, "");
+  ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK));
+
+  // Running callbacks that haven't been registered should do nothing.
+  em()->RunErrorNotificationCb(ErrorHandlerType::TABLET, "");
+  ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK));
+  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::TABLET));
+
+  // Now register another callback.
+  em()->SetErrorNotificationCb(ErrorHandlerType::TABLET,
+      Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
+           Unretained(this), ErrorHandlerType::TABLET));
+  em()->RunErrorNotificationCb(ErrorHandlerType::TABLET, "");
+  ASSERT_EQ(1, FindFirst(ErrorHandlerType::TABLET));
+
+  // Now unregister one of the callbacks. This should not affect the other.
+  em()->UnsetErrorNotificationCb(ErrorHandlerType::DISK);
+  em()->RunErrorNotificationCb(ErrorHandlerType::DISK, "");
+  em()->RunErrorNotificationCb(ErrorHandlerType::TABLET, "");
+
+  LOG(INFO) << "Final state of the vector: " << test_vec_string();
+  map<int, set<int>> positions = GetPositions();
+  set<int> disk_set = { 0 };        // The first entry should be DISK...
+  set<int> tablet_set = { 1, 2 };   // ...followed by TABLET, TABLET.
+  ASSERT_EQ(disk_set, FindOrDie(positions, ErrorHandlerType::DISK));
+  ASSERT_EQ(tablet_set, FindOrDie(positions, ErrorHandlerType::TABLET));
+}
+
+// Test that the callbacks get run serially.
+TEST_F(FsErrorManagerTest, TestSerialization) {
+  em()->SetErrorNotificationCb(ErrorHandlerType::DISK,
+      Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
+           Unretained(this), ErrorHandlerType::DISK));
+  em()->SetErrorNotificationCb(ErrorHandlerType::TABLET,
+      Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
+           Unretained(this), ErrorHandlerType::TABLET));
+
+  // Swap back and forth between error-handler type.
+  const auto IntToEnum = [&] (int i) {
+    return i % 2 == 0 ? ErrorHandlerType::DISK : ErrorHandlerType::TABLET;
+  };
+
+  vector<thread> cb_threads;
+  for (int i = 0; i < kVecSize; i++) {
+    // Each call will update the first available entry in test_vec_ after
+    // waiting a random amount of time. Without proper serialization, these
+    // could end up writing to the same entry.
+    cb_threads.emplace_back([&,i] {
+      em()->RunErrorNotificationCb(IntToEnum(i), "");
+    });
+  }
+  for (auto& t : cb_threads) {
+    t.join();
+  }
+  LOG(INFO) << "Final state of the vector: " << test_vec_string();
+
+  // Because the callbacks ran serially, all threads should have been able to
+  // get a unique slot in test_vec_, and thus, all entries should be filled.
+  // Note that the order of the calls does not matter, only the fact that they
+  // ran serially.
+  CHECK_EQ(-1, FindFirst(-1));
+}
+
+}  // namespace fs
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/error_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager.cc b/src/kudu/fs/error_manager.cc
new file mode 100644
index 0000000..dd83f18
--- /dev/null
+++ b/src/kudu/fs/error_manager.cc
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/fs/error_manager.h"
+#include "kudu/gutil/bind.h"
+
+using std::string;
+
+namespace kudu {
+
+namespace fs {
+
+// Default error-handling callback that no-ops.
+static void DoNothingErrorNotification(const string& /* uuid */) {}
+
+FsErrorManager::FsErrorManager() :
+  disk_cb_(Bind(DoNothingErrorNotification)),
+  tablet_cb_(Bind(DoNothingErrorNotification)) {}
+
+void FsErrorManager::SetErrorNotificationCb(ErrorHandlerType e, ErrorNotificationCb cb) {
+  std::lock_guard<Mutex> l(lock_);
+  switch (e) {
+    case ErrorHandlerType::DISK:
+      disk_cb_ = std::move(cb);
+      return;
+    case ErrorHandlerType::TABLET:
+      tablet_cb_ = std::move(cb);
+      return;
+    default:
+      LOG(FATAL) << "Unknown error handler type!";
+  }
+}
+
+void FsErrorManager::UnsetErrorNotificationCb(ErrorHandlerType e) {
+  std::lock_guard<Mutex> l(lock_);
+  switch (e) {
+    case ErrorHandlerType::DISK:
+      disk_cb_ = Bind(DoNothingErrorNotification);
+      return;
+    case ErrorHandlerType::TABLET:
+      tablet_cb_ = Bind(DoNothingErrorNotification);
+      return;
+    default:
+      LOG(FATAL) << "Unknown error handler type!";
+  }
+}
+
+void FsErrorManager::RunErrorNotificationCb(ErrorHandlerType e, const string& uuid) const {
+  std::lock_guard<Mutex> l(lock_);
+  switch (e) {
+    case ErrorHandlerType::DISK:
+      disk_cb_.Run(uuid);
+      return;
+    case ErrorHandlerType::TABLET:
+      tablet_cb_.Run(uuid);
+      return;
+    default:
+      LOG(FATAL) << "Unknown error handler type!";
+  }
+}
+
+}  // namespace fs
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/error_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager.h b/src/kudu/fs/error_manager.h
index cf0fdf7..a4e61c1 100644
--- a/src/kudu/fs/error_manager.h
+++ b/src/kudu/fs/error_manager.h
@@ -17,16 +17,16 @@
 
 #pragma once
 
-#include <map>
 #include <string>
 
+#include <glog/logging.h>
+
 #include "kudu/fs/block_manager_util.h"
 #include "kudu/fs/data_dirs.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/callback_forward.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/fault_injection.h"
-#include "kudu/util/status.h"
+#include "kudu/fs/fs.pb.h"
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/mutex.h"
 
 namespace kudu {
 namespace fs {
@@ -37,7 +37,6 @@ namespace fs {
 // e.g. the ErrorNotificationCb for disk failure handling takes the UUID of a
 // directory, marks it failed, and shuts down the tablets in that directory.
 typedef Callback<void(const std::string&)> ErrorNotificationCb;
-static void DoNothingErrorNotification(const std::string& /* uuid */) {}
 
 // Evaluates the expression and handles it if it results in an error.
 // Returns if the status is an error.
@@ -72,47 +71,70 @@ static void DoNothingErrorNotification(const std::string& /* uuid */) {}
   } \
 } while (0);
 
-// When certain operations fail, the side effects of the error can span
-// multiple layers, many of which we prefer to keep separate. The FsErrorManager
+enum ErrorHandlerType {
+  // For errors that affect a disk and all of its tablets (e.g. disk failure).
+  DISK,
+
+  // For errors that affect a single tablet (e.g. failure to create a block).
+  TABLET
+};
+
+// When certain operations fail, the side effects of the error can span multiple
+// layers, many of which we prefer to keep separate. The FsErrorManager
 // registers and runs error handlers without adding cross-layer dependencies.
+// Additionally, it enforces one callback is run at a time, and that each
+// callback fully completes before returning.
 //
 // e.g. the TSTabletManager registers a callback to handle disk failure.
 // Blocks and other entities that may hit disk failures can call it without
 // knowing about the TSTabletManager.
 class FsErrorManager {
  public:
-  FsErrorManager()
-    : notify_cb_(Bind(DoNothingErrorNotification)) {}
+  // TODO(awong): Register an actual error-handling function for tablet. Some
+  // errors may surface indirectly due to disk errors, but may not
+  // necessarily be caused by the failure of a specific disk.
+  //
+  // For example, if all of the disks in a tablet's directory group have
+  // already failed, the tablet would not be able to create a new block and
+  // return an error, despite CreateNewBlock() not actually touching disk.
+  // Before CreateNewBlock() returns, in order to satisfy various saftey
+  // checks surrounding the state of tablet post-failure, it must wait for
+  // disk failure handling of the failed disks to return.
+  //
+  // While this callback is a no-op, it serves to enforce that any
+  // error-handling caused by ERROR1 that may have indirectly caused ERROR2
+  // must complete before ERROR2 can be returned to its caller.
+  FsErrorManager();
 
   // Sets the error notification callback.
   //
   // This should be called when the callback's callee is initialized.
-  void SetErrorNotificationCb(ErrorNotificationCb cb) {
-    notify_cb_ = std::move(cb);
-  }
+  void SetErrorNotificationCb(ErrorHandlerType e, ErrorNotificationCb cb);
 
   // Resets the error notification callback.
   //
   // This must be called before the callback's callee is destroyed.
-  void UnsetErrorNotificationCb() {
-    notify_cb_ = Bind(DoNothingErrorNotification);
-  }
+  void UnsetErrorNotificationCb(ErrorHandlerType e);
 
   // Runs the error notification callback.
   //
   // 'uuid' is the full UUID of the component that failed.
-  void RunErrorNotificationCb(const std::string& uuid) const {
-    notify_cb_.Run(uuid);
-  }
+  void RunErrorNotificationCb(ErrorHandlerType e, const std::string& uuid) const;
 
   // Runs the error notification callback with the UUID of 'dir'.
-  void RunErrorNotificationCb(const DataDir* dir) const {
-    notify_cb_.Run(dir->instance()->metadata()->path_set().uuid());
+  void RunErrorNotificationCb(ErrorHandlerType e, const DataDir* dir) const {
+    DCHECK_EQ(e, ErrorHandlerType::DISK);
+    RunErrorNotificationCb(e, dir->instance()->metadata()->path_set().uuid());
   }
 
  private:
-   // Callback to be run when an error occurs.
-   ErrorNotificationCb notify_cb_;
+   // Callbacks to be run when an error occurs.
+  ErrorNotificationCb disk_cb_;
+  ErrorNotificationCb tablet_cb_;
+
+   // Protects calls to notifications, enforcing that a single callback runs at
+   // a time.
+   mutable Mutex lock_;
 };
 
 }  // namespace fs

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index dd69c5c..d5452f2 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -313,7 +313,8 @@ FileWritableBlock::~FileWritableBlock() {
 
 void FileWritableBlock::HandleError(const Status& s) const {
   HANDLE_DISK_FAILURE(
-      s, block_manager_->error_manager()->RunErrorNotificationCb(location_.data_dir()));
+      s, block_manager_->error_manager()->RunErrorNotificationCb(
+          ErrorHandlerType::DISK, location_.data_dir()));
 }
 
 Status FileWritableBlock::Close() {
@@ -471,7 +472,8 @@ class FileReadableBlock : public ReadableBlock {
 void FileReadableBlock::HandleError(const Status& s) const {
   const DataDir* dir = block_manager_->dd_manager_->FindDataDirByUuidIndex(
       internal::FileBlockLocation::GetDataDirIdx(block_id_));
-  HANDLE_DISK_FAILURE(s, block_manager_->error_manager()->RunErrorNotificationCb(dir));
+  HANDLE_DISK_FAILURE(s, block_manager_->error_manager()->RunErrorNotificationCb(
+      ErrorHandlerType::DISK, dir));
 }
 
 FileReadableBlock::FileReadableBlock(FileBlockManager* block_manager,
@@ -666,7 +668,8 @@ Status FileBlockManager::SyncMetadata(const internal::FileBlockLocation& locatio
     for (const string& s : to_sync) {
       if (metrics_) metrics_->total_disk_sync->Increment();
       RETURN_NOT_OK_HANDLE_DISK_FAILURE(env_->SyncDir(s),
-          error_manager_->RunErrorNotificationCb(location.data_dir()));
+          error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK,
+                                                 location.data_dir()));
     }
   }
   return Status::OK();
@@ -740,7 +743,8 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
   CHECK(!opts_.read_only);
 
   DataDir* dir;
-  RETURN_NOT_OK(dd_manager_->GetNextDataDir(opts, &dir));
+  RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::TABLET, opts.tablet_id));
   int uuid_idx;
   CHECK(dd_manager_->FindUuidIndexByDataDir(dir, &uuid_idx));
 
@@ -775,7 +779,8 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
     // We could create a block in a different directory, but there's currently
     // no point in doing so. On disk failure, the tablet specified by 'opts'
     // will be shut down, so the returned block would not be used.
-    RETURN_NOT_OK_HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
+    RETURN_NOT_OK_HANDLE_DISK_FAILURE(s,
+        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
     WritableFileOptions wr_opts;
     wr_opts.mode = Env::CREATE_NON_EXISTING;
     s = env_util::OpenFileForWrite(wr_opts, env_, path, &writer);
@@ -794,7 +799,8 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
     }
     block->reset(new internal::FileWritableBlock(this, location, writer));
   } else {
-    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
+    HANDLE_DISK_FAILURE(s,
+        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
     return s;
   }
   return Status::OK();
@@ -802,7 +808,8 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
 
 #define RETURN_NOT_OK_FBM_DISK_FAILURE(status_expr) do { \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
-      error_manager_->RunErrorNotificationCb(dd_manager_->FindDataDirByUuidIndex( \
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, \
+      dd_manager_->FindDataDirByUuidIndex( \
       internal::FileBlockLocation::GetDataDirIdx(block_id)))); \
 } while (0);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/fs_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 34b81ae..230ce71 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -95,6 +95,8 @@ using kudu::fs::BlockManagerOptions;
 using kudu::fs::CreateBlockOptions;
 using kudu::fs::DataDirManager;
 using kudu::fs::DataDirManagerOptions;
+using kudu::fs::ErrorHandlerType;
+using kudu::fs::ErrorNotificationCb;
 using kudu::fs::FsErrorManager;
 using kudu::fs::FileBlockManager;
 using kudu::fs::FsReport;
@@ -156,12 +158,12 @@ FsManager::FsManager(Env* env, FsManagerOpts opts)
 
 FsManager::~FsManager() {}
 
-void FsManager::SetErrorNotificationCb(fs::ErrorNotificationCb cb) {
-  error_manager_->SetErrorNotificationCb(std::move(cb));
+void FsManager::SetErrorNotificationCb(ErrorHandlerType e, ErrorNotificationCb cb) {
+  error_manager_->SetErrorNotificationCb(e, std::move(cb));
 }
 
-void FsManager::UnsetErrorNotificationCb() {
-  error_manager_->UnsetErrorNotificationCb();
+void FsManager::UnsetErrorNotificationCb(ErrorHandlerType e) {
+  error_manager_->UnsetErrorNotificationCb(e);
 }
 
 Status FsManager::Init() {
@@ -374,8 +376,8 @@ Status FsManager::Open(FsReport* report) {
   }
 
   // Set an initial error handler to mark data directories as failed.
-  error_manager_->SetErrorNotificationCb(Bind(&DataDirManager::MarkDataDirFailedByUuid,
-                                              Unretained(dd_manager_.get())));
+  error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK,
+      Bind(&DataDirManager::MarkDataDirFailedByUuid, Unretained(dd_manager_.get())));
 
   // Finally, initialize and open the block manager.
   InitBlockManager();

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/fs_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index cbb08a9..2c0fc21 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -141,13 +141,13 @@ class FsManager {
   //
   // If a disk failure is detected, this callback will be invoked with the
   // relevant DataDir's UUID as its input parameter.
-  void SetErrorNotificationCb(fs::ErrorNotificationCb cb);
+  void SetErrorNotificationCb(fs::ErrorHandlerType e, fs::ErrorNotificationCb cb);
 
   // Unregisters the error-handling callback with the FsErrorManager.
   //
   // This must be called before the callback's callee is destroyed. Calls to
   // this are idempotent and are safe even if a callback has not been set.
-  void UnsetErrorNotificationCb();
+  void UnsetErrorNotificationCb(fs::ErrorHandlerType e);
 
   // Create the initial filesystem layout. If 'uuid' is provided, uses it as
   // uuid of the filesystem. Otherwise generates one at random.
@@ -324,9 +324,9 @@ class FsManager {
 
   std::unique_ptr<InstanceMetadataPB> metadata_;
 
+  std::unique_ptr<fs::FsErrorManager> error_manager_;
   std::unique_ptr<fs::DataDirManager> dd_manager_;
   std::unique_ptr<fs::BlockManager> block_manager_;
-  std::unique_ptr<fs::FsErrorManager> error_manager_;
 
   bool initted_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 856c004..eca2ccc 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -1624,8 +1624,8 @@ TEST_F(LogBlockManagerTest, TestOpenWithFailedDirectories) {
       DataDirManagerOptions(), &dd_manager_));
 
   // Wire in a callback to fail data directories.
-  test_error_manager_->SetErrorNotificationCb(Bind(&DataDirManager::MarkDataDirFailedByUuid,
-                                                   Unretained(dd_manager_.get())));
+  test_error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK,
+      Bind(&DataDirManager::MarkDataDirFailedByUuid, Unretained(dd_manager_.get())));
   bm_.reset(CreateBlockManager(nullptr));
 
   // Fail one of the directories, chosen randomly.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index adca9f3..2958358 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -621,13 +621,13 @@ LogBlockContainer::LogBlockContainer(
 }
 
 void LogBlockContainer::HandleError(const Status& s) const {
-  HANDLE_DISK_FAILURE(
-      s, block_manager()->error_manager()->RunErrorNotificationCb(data_dir_));
+  HANDLE_DISK_FAILURE(s,
+      block_manager()->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK, data_dir_));
 }
 
 #define RETURN_NOT_OK_CONTAINER_DISK_FAILURE(status_expr) do { \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
-    block_manager->error_manager()->RunErrorNotificationCb(dir)); \
+    block_manager->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK, dir)); \
 } while (0);
 
 Status LogBlockContainer::Create(LogBlockManager* block_manager,
@@ -691,8 +691,9 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
   }
 
   // Prefer metadata status (arbitrarily).
-  HANDLE_DISK_FAILURE(metadata_status, block_manager->error_manager()->RunErrorNotificationCb(dir));
-  HANDLE_DISK_FAILURE(data_status, block_manager->error_manager()->RunErrorNotificationCb(dir));
+  FsErrorManager* em = block_manager->error_manager();
+  HANDLE_DISK_FAILURE(metadata_status, em->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
+  HANDLE_DISK_FAILURE(data_status, em->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
   return !metadata_status.ok() ? metadata_status : data_status;
 }
 
@@ -1926,7 +1927,8 @@ void LogBlockManager::RemoveFullContainerUnlocked(const string& container_name)
 Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
                                              LogBlockContainer** container) {
   DataDir* dir;
-  RETURN_NOT_OK(dd_manager_->GetNextDataDir(opts, &dir));
+  RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::TABLET, opts.tablet_id));
 
   {
     std::lock_guard<simple_spinlock> l(lock_);
@@ -1945,7 +1947,7 @@ Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
   // We could create a container in a different directory, but there's
   // currently no point in doing so. On disk failure, the tablet specified by
   // 'opts' will be shut down, so the returned container would not be used.
-  HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
+  HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
   RETURN_NOT_OK_PREPEND(s, "Could not create new log block container at " + dir->dir());
   {
     std::lock_guard<simple_spinlock> l(lock_);
@@ -2126,7 +2128,7 @@ Status LogBlockManager::RemoveLogBlockUnlocked(const BlockId& block_id,
 
   LogBlockContainer* container = it->second->container();
   HANDLE_DISK_FAILURE(container->read_only_status(),
-                      error_manager_->RunErrorNotificationCb(container->data_dir()));
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, container->data_dir()));
 
   // Return early if deleting a block in a failed directory.
   set<int> failed_dirs = dd_manager_->GetFailedDataDirs();
@@ -2180,7 +2182,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   vector<string> children;
   Status s = env_->GetChildren(dir->dir(), &children);
   if (!s.ok()) {
-    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
+    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
     *result_status = s.CloneAndPrepend(Substitute(
         "Could not list children of $0", dir->dir()));
     return;
@@ -2333,7 +2335,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       uint64_t reported_size;
       s = env_->GetFileSizeOnDisk(data_filename, &reported_size);
       if (!s.ok()) {
-        HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(dir));
+        HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
         *result_status = s.CloneAndPrepend(Substitute(
             "Could not get on-disk file size of container $0", container->ToString()));
         return;
@@ -2416,12 +2418,13 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
 #define RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(status_expr, msg) do { \
   Status s_ = (status_expr); \
   s_ = s_.CloneAndPrepend(msg); \
-  RETURN_NOT_OK_HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb(dir)); \
+  RETURN_NOT_OK_HANDLE_DISK_FAILURE(s_, \
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir)); \
 } while (0);
 
 #define WARN_NOT_OK_LBM_DISK_FAILURE(status_expr, msg) do { \
   Status s_ = (status_expr); \
-  HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb(dir)); \
+  HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir)); \
   WARN_NOT_OK(s_, msg); \
 } while (0);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 95b2b1e..8fdb85d 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -25,6 +25,7 @@
 #include <glog/logging.h>
 
 #include "kudu/cfile/block_cache.h"
+#include "kudu/fs/error_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/bind_helpers.h"
@@ -42,6 +43,7 @@
 #include "kudu/util/status.h"
 
 using std::string;
+using kudu::fs::ErrorHandlerType;
 using kudu::rpc::ServiceIf;
 
 namespace kudu {
@@ -112,8 +114,8 @@ Status TabletServer::WaitInited() {
 Status TabletServer::Start() {
   CHECK(initted_);
 
-  fs_manager_->SetErrorNotificationCb(Bind(&TSTabletManager::FailTabletsInDataDir,
-                                           Unretained(tablet_manager_.get())));
+  fs_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK,
+      Bind(&TSTabletManager::FailTabletsInDataDir, Unretained(tablet_manager_.get())));
 
   gscoped_ptr<ServiceIf> ts_service(new TabletServiceImpl(this));
   gscoped_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));
@@ -146,7 +148,7 @@ void TabletServer::Shutdown() {
     // 2. Shut down the tserver's subsystems.
     maintenance_manager_->Shutdown();
     WARN_NOT_OK(heartbeater_->Stop(), "Failed to stop TS Heartbeat thread");
-    fs_manager_->UnsetErrorNotificationCb();
+    fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::DISK);
     tablet_manager_->Shutdown();
 
     // 3. Shut down generic subsystems.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c742470/src/kudu/util/status.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/status.h b/src/kudu/util/status.h
index 6318e6d..afcdc8f 100644
--- a/src/kudu/util/status.h
+++ b/src/kudu/util/status.h
@@ -52,6 +52,15 @@
     if (PREDICT_FALSE(!s.ok())) return (to_return);  \
   } while (0);
 
+/// @brief Return the given status if it is not OK, evaluating `on_error` if so.
+#define KUDU_RETURN_NOT_OK_EVAL(s, on_error) do { \
+    const ::kudu::Status& _s = (s); \
+    if (PREDICT_FALSE(!_s.ok())) { \
+      (on_error); \
+      return _s; \
+    } \
+  } while (0);
+
 /// @brief Emit a warning if @c to_call returns a bad status.
 #define KUDU_WARN_NOT_OK(to_call, warning_prefix) do { \
     const ::kudu::Status& _s = (to_call);              \
@@ -115,6 +124,7 @@
 #define RETURN_NOT_OK         KUDU_RETURN_NOT_OK
 #define RETURN_NOT_OK_PREPEND KUDU_RETURN_NOT_OK_PREPEND
 #define RETURN_NOT_OK_RET     KUDU_RETURN_NOT_OK_RET
+#define RETURN_NOT_OK_EVAL    KUDU_RETURN_NOT_OK_EVAL
 #define WARN_NOT_OK           KUDU_WARN_NOT_OK
 #define LOG_AND_RETURN        KUDU_LOG_AND_RETURN
 #define RETURN_NOT_OK_LOG     KUDU_RETURN_NOT_OK_LOG