You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2018/08/29 01:52:34 UTC

kudu git commit: error_manager: rename error types

Repository: kudu
Updated Branches:
  refs/heads/master 2974f5a50 -> 6ec770928


error_manager: rename error types

The previous ErrorHandlerType enum names weren't very descriptive, and
reflected the expected error handling (e.g. fail a disk, fail a tablet),
rather than the error itself. In preparation of adding a new error
ErrorHandlerType for CFile checksum errors, this patch attempts to
clarify usage of the FsErrorManager.

As I intend on adding another error type, I updated the error manager to
use a map to store its error-handling callbacks, and added a map utility
function to facilitate this.

Change-Id: I377904ee51bb0f7b0f1104d7a4723f74d52e6e3f
Reviewed-on: http://gerrit.cloudera.org:8080/11303
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6ec77092
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6ec77092
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6ec77092

Branch: refs/heads/master
Commit: 6ec77092808e2ddaec2d2cfd8a44b740b43dafa9
Parents: 2974f5a
Author: Andrew Wong <aw...@cloudera.com>
Authored: Thu Aug 23 00:55:10 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Wed Aug 29 01:52:14 2018 +0000

----------------------------------------------------------------------
 src/kudu/fs/error_manager-test.cc     | 48 ++++++++++++++--------------
 src/kudu/fs/error_manager.cc          | 48 ++++++----------------------
 src/kudu/fs/error_manager.h           | 50 +++++++++++++++++-------------
 src/kudu/fs/file_block_manager.cc     | 14 ++++-----
 src/kudu/fs/fs_manager.cc             |  2 +-
 src/kudu/fs/log_block_manager-test.cc |  2 +-
 src/kudu/fs/log_block_manager.cc      | 27 +++++++++-------
 src/kudu/gutil/map-util.h             | 17 ++++++++++
 src/kudu/tserver/tablet_server.cc     |  4 +--
 src/kudu/util/map-util-test.cc        | 16 ++++++++--
 10 files changed, 120 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec77092/src/kudu/fs/error_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager-test.cc b/src/kudu/fs/error_manager-test.cc
index 0c06f0f..33a4866 100644
--- a/src/kudu/fs/error_manager-test.cc
+++ b/src/kudu/fs/error_manager-test.cc
@@ -124,54 +124,54 @@ class FsErrorManagerTest : public KuduTest {
 // callbacks) of the error manager.
 TEST_F(FsErrorManagerTest, TestBasicRegistration) {
   // Before registering anything, there should be all '-1's in test_vec_.
-  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::DISK));
-  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::TABLET));
+  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::DISK_ERROR));
+  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::NO_AVAILABLE_DISKS));
 
   // Register a callback to update the first '-1' entry in test_vec_ to '0'
   // after waiting a random amount of time.
-  em()->SetErrorNotificationCb(ErrorHandlerType::DISK,
+  em()->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
       Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
-           Unretained(this), ErrorHandlerType::DISK));
-  em()->RunErrorNotificationCb(ErrorHandlerType::DISK, "");
-  ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK));
+           Unretained(this), ErrorHandlerType::DISK_ERROR));
+  em()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, "");
+  ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK_ERROR));
 
   // Running callbacks that haven't been registered should do nothing.
-  em()->RunErrorNotificationCb(ErrorHandlerType::TABLET, "");
-  ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK));
-  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::TABLET));
+  em()->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, "");
+  ASSERT_EQ(0, FindFirst(ErrorHandlerType::DISK_ERROR));
+  ASSERT_EQ(-1, FindFirst(ErrorHandlerType::NO_AVAILABLE_DISKS));
 
   // Now register another callback.
-  em()->SetErrorNotificationCb(ErrorHandlerType::TABLET,
+  em()->SetErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
       Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
-           Unretained(this), ErrorHandlerType::TABLET));
-  em()->RunErrorNotificationCb(ErrorHandlerType::TABLET, "");
-  ASSERT_EQ(1, FindFirst(ErrorHandlerType::TABLET));
+           Unretained(this), ErrorHandlerType::NO_AVAILABLE_DISKS));
+  em()->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, "");
+  ASSERT_EQ(1, FindFirst(ErrorHandlerType::NO_AVAILABLE_DISKS));
 
   // Now unregister one of the callbacks. This should not affect the other.
-  em()->UnsetErrorNotificationCb(ErrorHandlerType::DISK);
-  em()->RunErrorNotificationCb(ErrorHandlerType::DISK, "");
-  em()->RunErrorNotificationCb(ErrorHandlerType::TABLET, "");
+  em()->UnsetErrorNotificationCb(ErrorHandlerType::DISK_ERROR);
+  em()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, "");
+  em()->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, "");
 
   LOG(INFO) << "Final state of the vector: " << test_vec_string();
   map<int, set<int>> positions = GetPositions();
   set<int> disk_set = { 0 };        // The first entry should be DISK...
-  set<int> tablet_set = { 1, 2 };   // ...followed by TABLET, TABLET.
-  ASSERT_EQ(disk_set, FindOrDie(positions, ErrorHandlerType::DISK));
-  ASSERT_EQ(tablet_set, FindOrDie(positions, ErrorHandlerType::TABLET));
+  set<int> tablet_set = { 1, 2 };   // ...followed by NO_AVAILABLE_DISKS, NO_AVAILABLE_DISKS.
+  ASSERT_EQ(disk_set, FindOrDie(positions, ErrorHandlerType::DISK_ERROR));
+  ASSERT_EQ(tablet_set, FindOrDie(positions, ErrorHandlerType::NO_AVAILABLE_DISKS));
 }
 
 // Test that the callbacks get run serially.
 TEST_F(FsErrorManagerTest, TestSerialization) {
-  em()->SetErrorNotificationCb(ErrorHandlerType::DISK,
+  em()->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
       Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
-           Unretained(this), ErrorHandlerType::DISK));
-  em()->SetErrorNotificationCb(ErrorHandlerType::TABLET,
+           Unretained(this), ErrorHandlerType::DISK_ERROR));
+  em()->SetErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS,
       Bind(&FsErrorManagerTest::SleepAndWriteFirstEmptyCb,
-           Unretained(this), ErrorHandlerType::TABLET));
+           Unretained(this), ErrorHandlerType::NO_AVAILABLE_DISKS));
 
   // Swap back and forth between error-handler type.
   const auto IntToEnum = [&] (int i) {
-    return i % 2 == 0 ? ErrorHandlerType::DISK : ErrorHandlerType::TABLET;
+    return i % 2 == 0 ? ErrorHandlerType::DISK_ERROR : ErrorHandlerType::NO_AVAILABLE_DISKS;
   };
 
   vector<thread> cb_threads;

http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec77092/src/kudu/fs/error_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager.cc b/src/kudu/fs/error_manager.cc
index dd83f18..0d69148 100644
--- a/src/kudu/fs/error_manager.cc
+++ b/src/kudu/fs/error_manager.cc
@@ -15,69 +15,41 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/fs/error_manager.h"
+
 #include <mutex>
-#include <ostream>
 #include <string>
 #include <utility>
 
-#include <glog/logging.h>
-
-#include "kudu/fs/error_manager.h"
 #include "kudu/gutil/bind.h"
+#include "kudu/gutil/map-util.h"
 
 using std::string;
 
 namespace kudu {
-
 namespace fs {
 
 // Default error-handling callback that no-ops.
 static void DoNothingErrorNotification(const string& /* uuid */) {}
 
-FsErrorManager::FsErrorManager() :
-  disk_cb_(Bind(DoNothingErrorNotification)),
-  tablet_cb_(Bind(DoNothingErrorNotification)) {}
+FsErrorManager::FsErrorManager() {
+  InsertOrDie(&callbacks_, ErrorHandlerType::DISK_ERROR, Bind(DoNothingErrorNotification));
+  InsertOrDie(&callbacks_, ErrorHandlerType::NO_AVAILABLE_DISKS, Bind(DoNothingErrorNotification));
+}
 
 void FsErrorManager::SetErrorNotificationCb(ErrorHandlerType e, ErrorNotificationCb cb) {
   std::lock_guard<Mutex> l(lock_);
-  switch (e) {
-    case ErrorHandlerType::DISK:
-      disk_cb_ = std::move(cb);
-      return;
-    case ErrorHandlerType::TABLET:
-      tablet_cb_ = std::move(cb);
-      return;
-    default:
-      LOG(FATAL) << "Unknown error handler type!";
-  }
+  EmplaceOrUpdate(&callbacks_, e, std::move(cb));
 }
 
 void FsErrorManager::UnsetErrorNotificationCb(ErrorHandlerType e) {
   std::lock_guard<Mutex> l(lock_);
-  switch (e) {
-    case ErrorHandlerType::DISK:
-      disk_cb_ = Bind(DoNothingErrorNotification);
-      return;
-    case ErrorHandlerType::TABLET:
-      tablet_cb_ = Bind(DoNothingErrorNotification);
-      return;
-    default:
-      LOG(FATAL) << "Unknown error handler type!";
-  }
+  EmplaceOrUpdate(&callbacks_, e, Bind(DoNothingErrorNotification));
 }
 
 void FsErrorManager::RunErrorNotificationCb(ErrorHandlerType e, const string& uuid) const {
   std::lock_guard<Mutex> l(lock_);
-  switch (e) {
-    case ErrorHandlerType::DISK:
-      disk_cb_.Run(uuid);
-      return;
-    case ErrorHandlerType::TABLET:
-      tablet_cb_.Run(uuid);
-      return;
-    default:
-      LOG(FATAL) << "Unknown error handler type!";
-  }
+  FindOrDie(callbacks_, e).Run(uuid);
 }
 
 }  // namespace fs

http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec77092/src/kudu/fs/error_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager.h b/src/kudu/fs/error_manager.h
index a4e61c1..61718f4 100644
--- a/src/kudu/fs/error_manager.h
+++ b/src/kudu/fs/error_manager.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <string>
+#include <unordered_map>
 
 #include <glog/logging.h>
 
@@ -72,11 +73,33 @@ typedef Callback<void(const std::string&)> ErrorNotificationCb;
 } while (0);
 
 enum ErrorHandlerType {
-  // For errors that affect a disk and all of its tablets (e.g. disk failure).
-  DISK,
+  // For disk failures.
+  DISK_ERROR,
 
-  // For errors that affect a single tablet (e.g. failure to create a block).
-  TABLET
+  // For errors that caused by no data dirs being available (e.g. if all disks
+  // are full or failed when creating a block).
+  //
+  // TODO(awong): Register an actual error-handling callback for
+  // NO_AVAILABLE_DISKS. Some errors may surface indirectly due to disk errors,
+  // but may not have touched disk, and thus may have not called the DISK_ERROR
+  // error handler.
+  //
+  // For example, if all of the disks in a tablet's directory group have
+  // already failed due to disk errors, the tablet would not be able to create
+  // a new block and return an error, despite CreateNewBlock() not actually
+  // touching disk and triggering running error handling. Callers of
+  // CreateNewBlock() will expect that if an error is returned, it has been
+  // handled, and may hit a CHECK failure otherwise. As such, before returning
+  // an error, CreateNewBlock() must wait for any in-flight error handling to
+  // finish.
+  //
+  // While this currently runs a no-op, it serves to enforce that any
+  // error-handling caused by ERROR1 that may have indirectly caused ERROR2
+  // (e.g. if ERROR1 is a disk error of the only disk on the server, and ERROR2
+  // is the subsequent failure to create a block because all disks have been
+  // marked as failed) must complete before ERROR2 can be returned to its
+  // caller.
+  NO_AVAILABLE_DISKS,
 };
 
 // When certain operations fail, the side effects of the error can span multiple
@@ -90,20 +113,6 @@ enum ErrorHandlerType {
 // knowing about the TSTabletManager.
 class FsErrorManager {
  public:
-  // TODO(awong): Register an actual error-handling function for tablet. Some
-  // errors may surface indirectly due to disk errors, but may not
-  // necessarily be caused by the failure of a specific disk.
-  //
-  // For example, if all of the disks in a tablet's directory group have
-  // already failed, the tablet would not be able to create a new block and
-  // return an error, despite CreateNewBlock() not actually touching disk.
-  // Before CreateNewBlock() returns, in order to satisfy various saftey
-  // checks surrounding the state of tablet post-failure, it must wait for
-  // disk failure handling of the failed disks to return.
-  //
-  // While this callback is a no-op, it serves to enforce that any
-  // error-handling caused by ERROR1 that may have indirectly caused ERROR2
-  // must complete before ERROR2 can be returned to its caller.
   FsErrorManager();
 
   // Sets the error notification callback.
@@ -123,14 +132,13 @@ class FsErrorManager {
 
   // Runs the error notification callback with the UUID of 'dir'.
   void RunErrorNotificationCb(ErrorHandlerType e, const DataDir* dir) const {
-    DCHECK_EQ(e, ErrorHandlerType::DISK);
+    DCHECK_EQ(e, ErrorHandlerType::DISK_ERROR);
     RunErrorNotificationCb(e, dir->instance()->metadata()->path_set().uuid());
   }
 
  private:
    // Callbacks to be run when an error occurs.
-  ErrorNotificationCb disk_cb_;
-  ErrorNotificationCb tablet_cb_;
+  std::unordered_map<ErrorHandlerType, ErrorNotificationCb, std::hash<int>> callbacks_;
 
    // Protects calls to notifications, enforcing that a single callback runs at
    // a time.

http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec77092/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 87fce9b..8fee5be 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -313,7 +313,7 @@ FileWritableBlock::~FileWritableBlock() {
 void FileWritableBlock::HandleError(const Status& s) const {
   HANDLE_DISK_FAILURE(
       s, block_manager_->error_manager()->RunErrorNotificationCb(
-          ErrorHandlerType::DISK, location_.data_dir()));
+          ErrorHandlerType::DISK_ERROR, location_.data_dir()));
 }
 
 Status FileWritableBlock::Close() {
@@ -474,7 +474,7 @@ void FileReadableBlock::HandleError(const Status& s) const {
   const DataDir* dir = block_manager_->dd_manager_->FindDataDirByUuidIndex(
       internal::FileBlockLocation::GetDataDirIdx(block_id_));
   HANDLE_DISK_FAILURE(s, block_manager_->error_manager()->RunErrorNotificationCb(
-      ErrorHandlerType::DISK, dir));
+      ErrorHandlerType::DISK_ERROR, dir));
 }
 
 FileReadableBlock::FileReadableBlock(FileBlockManager* block_manager,
@@ -673,7 +673,7 @@ Status FileBlockManager::SyncMetadata(const internal::FileBlockLocation& locatio
     for (const string& s : to_sync) {
       if (metrics_) metrics_->total_disk_sync->Increment();
       RETURN_NOT_OK_HANDLE_DISK_FAILURE(env_->SyncDir(s),
-          error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK,
+          error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
                                                  location.data_dir()));
     }
   }
@@ -749,7 +749,7 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
 
   DataDir* dir;
   RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
-      error_manager_->RunErrorNotificationCb(ErrorHandlerType::TABLET, opts.tablet_id));
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, opts.tablet_id));
   int uuid_idx;
   CHECK(dd_manager_->FindUuidIndexByDataDir(dir, &uuid_idx));
 
@@ -785,7 +785,7 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
     // no point in doing so. On disk failure, the tablet specified by 'opts'
     // will be shut down, so the returned block would not be used.
     RETURN_NOT_OK_HANDLE_DISK_FAILURE(s,
-        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
+        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir));
     WritableFileOptions wr_opts;
     wr_opts.mode = Env::CREATE_NON_EXISTING;
     s = env_util::OpenFileForWrite(wr_opts, env_, path, &writer);
@@ -805,7 +805,7 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
     block->reset(new internal::FileWritableBlock(this, location, writer));
   } else {
     HANDLE_DISK_FAILURE(s,
-        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
+        error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir));
     return s;
   }
   return Status::OK();
@@ -813,7 +813,7 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
 
 #define RETURN_NOT_OK_FBM_DISK_FAILURE(status_expr) do { \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
-      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, \
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, \
       dd_manager_->FindDataDirByUuidIndex( \
       internal::FileBlockLocation::GetDataDirIdx(block_id)))); \
 } while (0);

http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec77092/src/kudu/fs/fs_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 93fd7f3..a9b954d 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -410,7 +410,7 @@ Status FsManager::Open(FsReport* report) {
   }
 
   // Set an initial error handler to mark data directories as failed.
-  error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK,
+  error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
       Bind(&DataDirManager::MarkDataDirFailedByUuid, Unretained(dd_manager_.get())));
 
   // Finally, initialize and open the block manager.

http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec77092/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 3dc79e8..8c755f0 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -1633,7 +1633,7 @@ TEST_F(LogBlockManagerTest, TestOpenWithFailedDirectories) {
       DataDirManagerOptions(), &dd_manager_));
 
   // Wire in a callback to fail data directories.
-  test_error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK,
+  test_error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
       Bind(&DataDirManager::MarkDataDirFailedByUuid, Unretained(dd_manager_.get())));
   bm_.reset(CreateBlockManager(nullptr));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec77092/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 86e8eaa..f1c66f5 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -621,12 +621,13 @@ LogBlockContainer::LogBlockContainer(
 
 void LogBlockContainer::HandleError(const Status& s) const {
   HANDLE_DISK_FAILURE(s,
-      block_manager()->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK, data_dir_));
+      block_manager()->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
+                                                               data_dir_));
 }
 
 #define RETURN_NOT_OK_CONTAINER_DISK_FAILURE(status_expr) do { \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE((status_expr), \
-    block_manager->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK, dir)); \
+    block_manager->error_manager()->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir)); \
 } while (0);
 
 Status LogBlockContainer::Create(LogBlockManager* block_manager,
@@ -691,8 +692,9 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
 
   // Prefer metadata status (arbitrarily).
   FsErrorManager* em = block_manager->error_manager();
-  HANDLE_DISK_FAILURE(metadata_status, em->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
-  HANDLE_DISK_FAILURE(data_status, em->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
+  HANDLE_DISK_FAILURE(metadata_status, em->RunErrorNotificationCb(
+      ErrorHandlerType::DISK_ERROR, dir));
+  HANDLE_DISK_FAILURE(data_status, em->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir));
   return !metadata_status.ok() ? metadata_status : data_status;
 }
 
@@ -1933,7 +1935,7 @@ Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
                                              LogBlockContainer** container) {
   DataDir* dir;
   RETURN_NOT_OK_EVAL(dd_manager_->GetNextDataDir(opts, &dir),
-      error_manager_->RunErrorNotificationCb(ErrorHandlerType::TABLET, opts.tablet_id));
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::NO_AVAILABLE_DISKS, opts.tablet_id));
 
   {
     std::lock_guard<simple_spinlock> l(lock_);
@@ -1952,7 +1954,7 @@ Status LogBlockManager::GetOrCreateContainer(const CreateBlockOptions& opts,
   // We could create a container in a different directory, but there's
   // currently no point in doing so. On disk failure, the tablet specified by
   // 'opts' will be shut down, so the returned container would not be used.
-  HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
+  HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir));
   RETURN_NOT_OK_PREPEND(s, "Could not create new log block container at " + dir->dir());
   {
     std::lock_guard<simple_spinlock> l(lock_);
@@ -2133,7 +2135,7 @@ Status LogBlockManager::RemoveLogBlockUnlocked(const BlockId& block_id,
 
   LogBlockContainer* container = it->second->container();
   HANDLE_DISK_FAILURE(container->read_only_status(),
-      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, container->data_dir()));
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, container->data_dir()));
 
   // Return early if deleting a block in a failed directory.
   set<int> failed_dirs = dd_manager_->GetFailedDataDirs();
@@ -2187,7 +2189,8 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   vector<string> children;
   Status s = env_->GetChildren(dir->dir(), &children);
   if (!s.ok()) {
-    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
+    HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
+        ErrorHandlerType::DISK_ERROR, dir));
     *result_status = s.CloneAndPrepend(Substitute(
         "Could not list children of $0", dir->dir()));
     return;
@@ -2340,7 +2343,8 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       uint64_t reported_size;
       s = env_->GetFileSizeOnDisk(data_filename, &reported_size);
       if (!s.ok()) {
-        HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir));
+        HANDLE_DISK_FAILURE(s, error_manager_->RunErrorNotificationCb(
+            ErrorHandlerType::DISK_ERROR, dir));
         *result_status = s.CloneAndPrepend(Substitute(
             "Could not get on-disk file size of container $0", container->ToString()));
         return;
@@ -2424,12 +2428,13 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   Status s_ = (status_expr); \
   s_ = s_.CloneAndPrepend(msg); \
   RETURN_NOT_OK_HANDLE_DISK_FAILURE(s_, \
-      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir)); \
+      error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir)); \
 } while (0);
 
 #define WARN_NOT_OK_LBM_DISK_FAILURE(status_expr, msg) do { \
   Status s_ = (status_expr); \
-  HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK, dir)); \
+  HANDLE_DISK_FAILURE(s_, error_manager_->RunErrorNotificationCb( \
+      ErrorHandlerType::DISK_ERROR, dir)); \
   WARN_NOT_OK(s_, msg); \
 } while (0);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec77092/src/kudu/gutil/map-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/gutil/map-util.h b/src/kudu/gutil/map-util.h
index dd4df19..82d266f 100644
--- a/src/kudu/gutil/map-util.h
+++ b/src/kudu/gutil/map-util.h
@@ -447,6 +447,23 @@ bool EmplaceIfNotPresent(Collection* const collection,
   return collection->emplace(std::forward<Args>(args)...).second;
 }
 
+// Emplaces the given key-value pair into the collection. Returns true if the
+// given key didn't previously exist. If the given key already existed in the
+// map, its value is changed to the given "value" and false is returned.
+template <class Collection>
+bool EmplaceOrUpdate(Collection* const collection,
+                     const typename Collection::key_type& key,
+                     typename Collection::mapped_type&& value) {
+  typedef typename Collection::mapped_type mapped_type;
+  auto it = collection->find(key);
+  if (it == collection->end()) {
+    collection->emplace(key, std::forward<mapped_type>(value));
+    return true;
+  }
+  it->second = std::forward<mapped_type>(value);
+  return false;
+}
+
 template <class Collection, class... Args>
 void EmplaceOrDie(Collection* const collection,
                   Args&&... args) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec77092/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 068408b..f7a271c 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -115,7 +115,7 @@ Status TabletServer::WaitInited() {
 Status TabletServer::Start() {
   CHECK(initted_);
 
-  fs_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK,
+  fs_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
       Bind(&TSTabletManager::FailTabletsInDataDir, Unretained(tablet_manager_.get())));
 
   gscoped_ptr<ServiceIf> ts_service(new TabletServiceImpl(this));
@@ -149,7 +149,7 @@ void TabletServer::Shutdown() {
     // 2. Shut down the tserver's subsystems.
     maintenance_manager_->Shutdown();
     WARN_NOT_OK(heartbeater_->Stop(), "Failed to stop TS Heartbeat thread");
-    fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::DISK);
+    fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::DISK_ERROR);
     tablet_manager_->Shutdown();
 
     // 3. Shut down generic subsystems.

http://git-wip-us.apache.org/repos/asf/kudu/blob/6ec77092/src/kudu/util/map-util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/map-util-test.cc b/src/kudu/util/map-util-test.cc
index 3aa9448..b074007 100644
--- a/src/kudu/util/map-util-test.cc
+++ b/src/kudu/util/map-util-test.cc
@@ -104,13 +104,23 @@ TEST(EraseKeyReturnValuePtrTest, TestRawAndSmartSmartPointers) {
 }
 
 TEST(EmplaceTest, TestEmplace) {
+  string key1("k");
+  string key2("k2");
   // Map with move-only value type.
   map<string, unique_ptr<string>> my_map;
   unique_ptr<string> val(new string("foo"));
-  ASSERT_TRUE(EmplaceIfNotPresent(&my_map, "k", std::move(val)));
-  ASSERT_TRUE(ContainsKey(my_map, "k"));
-  ASSERT_FALSE(EmplaceIfNotPresent(&my_map, "k", nullptr))
+  ASSERT_TRUE(EmplaceIfNotPresent(&my_map, key1, std::move(val)));
+  ASSERT_TRUE(ContainsKey(my_map, key1));
+  ASSERT_FALSE(EmplaceIfNotPresent(&my_map, key1, nullptr))
       << "Should return false for already-present";
+
+  val = unique_ptr<string>(new string("bar"));
+  ASSERT_TRUE(EmplaceOrUpdate(&my_map, key2, std::move(val)));
+  ASSERT_TRUE(ContainsKey(my_map, key2));
+  ASSERT_EQ("bar", *FindOrDie(my_map, key2));
+  val = unique_ptr<string>(new string("foobar"));
+  ASSERT_FALSE(EmplaceOrUpdate(&my_map, key2, std::move(val)));
+  ASSERT_EQ("foobar", *FindOrDie(my_map, key2));
 }
 
 } // namespace kudu