You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/04/27 03:10:56 UTC

[1/3] incubator-kudu git commit: KUDU-495 (part 2): ensure all catalog writes for an operation are batched

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 938f7feaf -> 73b8cc07e


KUDU-495 (part 2): ensure all catalog writes for an operation are batched

The motivation is simple: by combining all catalog writes for a single
logical operation into one write, we can safely downgrade the CHECK_OK in
DeleteTable() to a runtime failure, and we can get away with absolutely no
"roll forward" repair of on-disk metadata when it is reloaded. For this to
work, we need all of the master metadata to be in a single tablet, and that
was done a long time ago; all that remained was the previous change to the
sys_catalog API.

Let's start with DeleteTable(). The disparate writes have been combined into
one. The order of operations has been changed to be safe, with in-memory
modifications taking place only after the write has succeeded. Doing this
requires that we commit tablet mutations before table mutations, and forces
a change to tablet iteration order in ExtractTabletsToProcess(). I also
audited all tablet readers to make sure they're OK with seeing a deleted
tablet before its table (they are).

The disparate writes in CreateTable() have also been combined. The operation
is still not completely safe (some in-memory changes take place before the
write succeeds); I will tackle that in a follow-on patch.

To test this, I used a combination of white box and block box methods.
First, I extended fault injection to returns errors and hooked that up to
sys_catalog. The new test enables fault injection, then performs a bunch of
random operations as if it were a client, coping with failures as they
arise. When it's done, it does a verification pass on the master metadata
using the sys_catalog visitor hooks.

Change-Id: I5cbccf5ce22c005d7aa25bbdefe7502873a8ed7d
Reviewed-on: http://gerrit.cloudera.org:8080/2695
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/af8a4af0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/af8a4af0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/af8a4af0

Branch: refs/heads/master
Commit: af8a4af09175e22ca6d0f24c244c2700ed88e436
Parents: 938f7fe
Author: Adar Dembo <ad...@cloudera.com>
Authored: Thu Mar 31 19:49:36 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Apr 27 01:07:59 2016 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc | 456 +++++++++++++++++---------------
 src/kudu/master/catalog_manager.h  |  38 +--
 src/kudu/master/master-test.cc     | 307 +++++++++++++++++++++
 src/kudu/master/sys_catalog.cc     |  11 +
 src/kudu/master/sys_catalog.h      |   6 +
 src/kudu/util/fault_injection.cc   |   9 +
 src/kudu/util/fault_injection.h    |  29 +-
 7 files changed, 619 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/af8a4af0/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 0322d7d..71f51fb 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -32,7 +32,12 @@
 //   since they only conflict with COMMIT which is a purely in-memory operation.
 //   Thus they are deadlock-free.
 // - If you need a WRITE lock on both a table and one or more of its tablets,
-//   acquire the lock on the table first. This strict ordering prevents deadlocks.
+//   acquire the lock on the table first, and acquire the locks on the tablets
+//   in tablet ID order, or let ScopedTabletInfoCommitter do the locking. This
+//   strict ordering prevents deadlocks. Along the same lines, COMMIT must
+//   happen in reverse (i.e. the tablet lock must be committed before the table
+//   lock). The only exceptions to this are when there's only one thread in
+//   operation, such as during master failover.
 
 #include "kudu/master/catalog_manager.h"
 
@@ -43,6 +48,7 @@
 #include <glog/logging.h>
 
 #include <algorithm>
+#include <memory>
 #include <set>
 #include <string>
 #include <vector>
@@ -147,6 +153,7 @@ TAG_FLAG(catalog_manager_check_ts_count_for_create_table, hidden);
 
 using std::shared_ptr;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
@@ -188,20 +195,20 @@ class TableLoader : public TableVisitor {
     CHECK(!ContainsKey(catalog_manager_->table_ids_map_, table_id))
           << "Table already exists: " << table_id;
 
-    // Setup the table info
+    // Set up the table info.
     TableInfo *table = new TableInfo(table_id);
     TableMetadataLock l(table, TableMetadataLock::WRITE);
     l.mutable_data()->pb.CopyFrom(metadata);
 
-    // Add the tablet to the IDs map and to the name map (if the table is not deleted)
+    // Add the tablet to the IDs map and to the name map (if the table is not deleted).
     catalog_manager_->table_ids_map_[table->id()] = table;
     if (!l.data().is_deleted()) {
       catalog_manager_->table_names_map_[l.data().name()] = table;
     }
+    l.Commit();
 
     LOG(INFO) << "Loaded metadata for table " << table->ToString();
     VLOG(1) << "Metadata for table " << table->ToString() << ": " << metadata.ShortDebugString();
-    l.Commit();
     return Status::OK();
   }
 
@@ -224,49 +231,34 @@ class TabletLoader : public TabletVisitor {
   virtual Status VisitTablet(const std::string& table_id,
                              const std::string& tablet_id,
                              const SysTabletsEntryPB& metadata) OVERRIDE {
-    // Lookup the table
+    // Lookup the table.
     scoped_refptr<TableInfo> table(FindPtrOrNull(
-                                     catalog_manager_->table_ids_map_, table_id));
+        catalog_manager_->table_ids_map_, table_id));
+    if (table == nullptr) {
+      // Tables and tablets are always created/deleted in one operation, so
+      // this shouldn't be possible.
+      LOG(ERROR) << "Missing Table " << table_id << " required by tablet " << tablet_id;
+      LOG(ERROR) << "Metadata: " << metadata.DebugString();
+      return Status::Corruption("Missing table for tablet: ", tablet_id);
+    }
 
-    // Setup the tablet info
+    // Set up the tablet info.
     TabletInfo* tablet = new TabletInfo(table, tablet_id);
     TabletMetadataLock l(tablet, TabletMetadataLock::WRITE);
     l.mutable_data()->pb.CopyFrom(metadata);
 
-    // Add the tablet to the tablet manager
+    // Add the tablet to the tablet manager.
     catalog_manager_->tablet_map_[tablet->tablet_id()] = tablet;
 
-    if (table == nullptr) {
-      // if the table is missing and the tablet is in "preparing" state
-      // may mean that the table was not created (maybe due to a failed write
-      // for the sys-tablets). The cleaner will remove
-      if (l.data().pb.state() == SysTabletsEntryPB::PREPARING) {
-        LOG(WARNING) << "Missing Table " << table_id << " required by tablet " << tablet_id
-                     << " (probably a failed table creation: the tablet was not assigned)";
-        return Status::OK();
-      }
-
-      // if the tablet is not in a "preparing" state, something is wrong...
-      LOG(ERROR) << "Missing Table " << table_id << " required by tablet " << tablet_id;
-      LOG(ERROR) << "Metadata: " << metadata.DebugString();
-      return Status::Corruption("Missing table for tablet: ", tablet_id);
-    }
-
-    // Add the tablet to the Table
+    // Add the tablet to the Tablet
     if (!l.mutable_data()->is_deleted()) {
       table->AddTablet(tablet);
     }
     l.Commit();
 
-    // TODO(KUDU-1070): if we see a running tablet under a deleted table,
-    // we should "roll forward" the deletion of the tablet here.
-
-    TableMetadataLock table_lock(table.get(), TableMetadataLock::READ);
-
     LOG(INFO) << "Loaded metadata for tablet " << tablet_id
               << " (table " << table->ToString() << ")";
     VLOG(2) << "Metadata for tablet " << tablet_id << ": " << metadata.ShortDebugString();
-
     return Status::OK();
   }
 
@@ -351,11 +343,10 @@ void CatalogManagerBgTasks::Run() {
     if (!catalog_manager_->IsInitialized()) {
       LOG(WARNING) << "Catalog manager is not initialized!";
     } else if (catalog_manager_->CheckIsLeaderAndReady().ok()) {
-      std::vector<scoped_refptr<TabletInfo> > to_delete;
-      std::vector<scoped_refptr<TabletInfo> > to_process;
+      std::vector<scoped_refptr<TabletInfo>> to_process;
 
-      // Get list of tablets not yet running or already replaced.
-      catalog_manager_->ExtractTabletsToProcess(&to_delete, &to_process);
+      // Get list of tablets not yet running.
+      catalog_manager_->ExtractTabletsToProcess(&to_process);
 
       if (!to_process.empty()) {
         // Transition tablet assignment state from preparing to creating, send
@@ -375,10 +366,6 @@ void CatalogManagerBgTasks::Run() {
       VLOG(1) << "We are no longer the leader, aborting the current task...";
     }
 
-    //if (!to_delete.empty()) {
-      // TODO: Run the cleaner
-    //}
-
     // Wait for a notification or a timeout expiration.
     //  - CreateTable will call Wake() to notify about the tablets to add
     //  - HandleReportedTablet/ProcessPendingAssignments will call WakeIfHasPendingUpdates()
@@ -394,6 +381,102 @@ void CatalogManagerBgTasks::Run() {
 
 namespace {
 
+// Tracks, or aborts commits TabletInfo mutations.
+//
+// Can be used in one of two ways:
+// 1. To track already-locked TabletInfos and commit them on end of scope:
+//      {
+//        ScopedTabletInfoCommitter c(ScopedTabletInfoCommitter::LOCKED);
+//        c.addTablets({ one, two, three });
+//        <Perform mutations>
+//      } // Mutations are committed
+//
+// 2. To aggregate unlocked TabletInfos, lock them safely, and commit them on end of scope:
+//      {
+//        ScopedTabletInfoCommitter c(ScopedTabletInfoCommitter::UNLOCKED);
+//        c.addTablets({ five, two, three });
+//        c.addTablets({ four, one });
+//        c.LockTabletsForWriting();
+//        <Perform mutations>
+//      } // Mutations are committed
+//
+// The acquisition or release of multiple tablet locks is done in tablet ID
+// order, as required by the locking rules (see the top of the file).
+class ScopedTabletInfoCommitter {
+ private:
+  // Compares TabletInfos using their underlying tablet IDs.
+  struct TabletInfoCompare {
+    bool operator() (const scoped_refptr<TabletInfo>& left,
+                     const scoped_refptr<TabletInfo>& right) const {
+      return left->tablet_id() < right->tablet_id();
+    }
+  };
+
+  // Must be defined before begin()/end() below.
+  typedef set<scoped_refptr<TabletInfo>, TabletInfoCompare> TabletSet;
+
+ public:
+  // Whether tablets added to this committer have been locked already or
+  // should be locked by the committer itself.
+  enum State {
+    LOCKED,
+    UNLOCKED,
+  };
+
+  explicit ScopedTabletInfoCommitter(State state)
+    : state_(state),
+      aborted_(false) {
+  }
+
+  // Acquire write locks for all of the tablets previously added.
+  void LockTabletsForWriting() {
+    DCHECK_EQ(UNLOCKED, state_);
+    for (const auto& t : tablets_) {
+      t->mutable_metadata()->StartMutation();
+    }
+    state_ = LOCKED;
+  }
+
+  // Release all write locks, discarding any mutated tablet data.
+  void Abort() {
+    DCHECK(!aborted_);
+    if (state_ == LOCKED) {
+      for (const auto & t : tablets_) {
+        t->mutable_metadata()->AbortMutation();
+      }
+    }
+    aborted_ = true;
+  }
+
+  // Release all write locks, committing any mutated tablet data.
+  ~ScopedTabletInfoCommitter() {
+    if (PREDICT_TRUE(!aborted_ && state_ == LOCKED)) {
+      for (const auto& t : tablets_) {
+        t->mutable_metadata()->CommitMutation();
+      }
+    }
+  }
+
+  // Add new tablets to be tracked.
+  void AddTablets(const vector<scoped_refptr<TabletInfo>>& new_tablets) {
+    DCHECK(!aborted_);
+    tablets_.insert(new_tablets.begin(), new_tablets.end());
+  }
+
+  // These methods allow the class to be used in range-based for loops.
+  const TabletSet::iterator begin() const {
+    return tablets_.begin();
+  }
+  const TabletSet::iterator end() const {
+    return tablets_.end();
+  }
+
+ private:
+  TabletSet tablets_;
+  State state_;
+  bool aborted_;
+};
+
 string RequestorString(RpcContext* rpc) {
   if (rpc) {
     return rpc->requestor_string();
@@ -828,37 +911,24 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   for (const TabletInfo *tablet : tablets) {
     CHECK_EQ(SysTabletsEntryPB::PREPARING, tablet->metadata().dirty().pb.state());
   }
-
-  // e. Write Tablets to sys-tablets (in "preparing" state)
-  SysCatalogTable::Actions tablet_actions;
-  tablet_actions.tablets_to_add = tablets;
-  s = sys_catalog_->Write(tablet_actions);
-  if (!s.ok()) {
-    s = s.CloneAndPrepend(Substitute("An error occurred while inserting to sys-tablets: $0",
-                                     s.ToString()));
-    LOG(WARNING) << s.ToString();
-    AbortTableCreation(table.get(), tablets);
-    CheckIfNoLongerLeaderAndSetupError(s, resp);
-    return s;
-  }
-  TRACE("Wrote tablets to system table");
-
-  // f. Update the on-disk table state to "running".
   table->mutable_metadata()->mutable_dirty()->pb.set_state(SysTablesEntryPB::RUNNING);
-  SysCatalogTable::Actions table_actions;
-  table_actions.table_to_add = table.get();
-  s = sys_catalog_->Write(table_actions);
+
+  // e. Write table and tablets to sys-catalog.
+  SysCatalogTable::Actions actions;
+  actions.table_to_add = table.get();
+  actions.tablets_to_add = tablets;
+  s = sys_catalog_->Write(actions);
   if (!s.ok()) {
-    s = s.CloneAndPrepend(Substitute("An error occurred while inserting to sys-tablets: $0",
+    s = s.CloneAndPrepend(Substitute("An error occurred while writing to sys-catalog: $0",
                                      s.ToString()));
     LOG(WARNING) << s.ToString();
     AbortTableCreation(table.get(), tablets);
     CheckIfNoLongerLeaderAndSetupError(s, resp);
     return s;
   }
-  TRACE("Wrote table to system table");
+  TRACE("Wrote table and tablets to system table");
 
-  // g. Commit the in-memory state.
+  // f. Commit the in-memory state.
   table->mutable_metadata()->CommitMutation();
 
   for (TabletInfo *tablet : tablets) {
@@ -940,12 +1010,6 @@ Status CatalogManager::FindTable(const TableIdentifierPB& table_identifier,
   return Status::OK();
 }
 
-// Delete a Table
-//  - Update the table state to "removed"
-//  - Write the updated table metadata to sys-table
-//
-// we are lazy about deletions...
-// the cleaner will remove tables and tablets marked as "removed"
 Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
                                    DeleteTableResponsePB* resp,
                                    rpc::RpcContext* rpc) {
@@ -954,10 +1018,9 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
 
   RETURN_NOT_OK(CheckOnline());
 
-  scoped_refptr<TableInfo> table;
-
-  // 1. Lookup the table and verify if it exists
+  // 1. Look up the table, lock it, and mark it as removed.
   TRACE("Looking up table");
+  scoped_refptr<TableInfo> table;
   RETURN_NOT_OK(FindTable(req->table(), &table));
   if (table == nullptr) {
     Status s = Status::NotFound("The table does not exist", req->table().DebugString());
@@ -973,45 +1036,68 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
     return s;
   }
 
-  TRACE("Updating metadata on disk");
-  // 2. Update the metadata for the on-disk state
-  l.mutable_data()->set_state(SysTablesEntryPB::REMOVED,
-                              Substitute("Deleted at $0", LocalTimeAsString()));
-
-  // 3. Update sys-catalog with the removed table state.
-  SysCatalogTable::Actions actions;
-  actions.table_to_update = table.get();
-  Status s = sys_catalog_->Write(actions);
-  if (!s.ok()) {
-    // The mutation will be aborted when 'l' exits the scope on early return.
-    s = s.CloneAndPrepend(Substitute("An error occurred while updating sys tables: $0",
-                                     s.ToString()));
-    LOG(WARNING) << s.ToString();
-    CheckIfNoLongerLeaderAndSetupError(s, resp);
-    return s;
-  }
+  TRACE("Modifying in-memory table state")
+  string deletion_msg = "Table deleted at " + LocalTimeAsString();
+  l.mutable_data()->set_state(SysTablesEntryPB::REMOVED, deletion_msg);
 
-  // 4. Remove it from the by-name map
+  // 2. Look up the tablets, lock them, and mark them as deleted.
   {
-    TRACE("Removing from by-name map");
-    boost::lock_guard<LockType> l_map(lock_);
-    if (table_names_map_.erase(l.data().name()) != 1) {
-      PANIC_RPC(rpc, "Could not remove table from map, name=" + l.data().name());
+    ScopedTabletInfoCommitter committer(ScopedTabletInfoCommitter::UNLOCKED);
+    TRACE("Locking tablets");
+    vector<scoped_refptr<TabletInfo>> tablets;
+    table->GetAllTablets(&tablets);
+    committer.AddTablets(tablets);
+    committer.LockTabletsForWriting();
+
+    vector<TabletInfo*> tablets_raw;
+    for (const auto& t : committer) {
+      t->mutable_metadata()->mutable_dirty()->set_state(
+          SysTabletsEntryPB::DELETED, deletion_msg);
+      tablets_raw.push_back(t.get());
+    }
+
+    // 3. Update sys-catalog with the removed table and tablet state.
+    TRACE("Removing table and tablets from system table");
+    SysCatalogTable::Actions actions;
+    actions.table_to_update = table.get();
+    actions.tablets_to_update = tablets_raw;
+    Status s = sys_catalog_->Write(actions);
+    if (!s.ok()) {
+      s = s.CloneAndPrepend(Substitute("An error occurred while updating sys tables: $0",
+                                       s.ToString()));
+      LOG(WARNING) << s.ToString();
+      CheckIfNoLongerLeaderAndSetupError(s, resp);
+      committer.Abort();
+      return s;
     }
-  }
 
-  table->AbortTasks();
+    // The operation has been written to sys-catalog; now it must succeed.
+
+    // 4. Remove the table from the by-name map.
+    {
+      TRACE("Removing table from by-name map");
+      boost::lock_guard<LockType> l_map(lock_);
+      if (table_names_map_.erase(l.data().name()) != 1) {
+        PANIC_RPC(rpc, "Could not remove table from map, name=" + l.data().name());
+      }
+    }
 
-  // 5. Update the in-memory state
+    // 5. Commit the dirty tablet state (on end of scope).
+  }
+
+  // 6. Commit the dirty table state.
   TRACE("Committing in-memory state");
   l.Commit();
 
-  // Send a DeleteTablet() request to each tablet replica in the table.
-  DeleteTabletsAndSendRequests(table);
+  // 7. Abort any extant tasks belonging to the table.
+  TRACE("Aborting table tasks");
+  table->AbortTasks();
+
+  // 8. Send a DeleteTablet() request to each tablet replica in the table.
+  SendDeleteTableRequest(table, deletion_msg);
 
   LOG(INFO) << "Successfully deleted table " << table->ToString()
             << " per request from " << RequestorString(rpc);
-  background_tasks_->Wake();
   return Status::OK();
 }
 
@@ -1108,10 +1194,9 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
 
   RETURN_NOT_OK(CheckOnline());
 
-  scoped_refptr<TableInfo> table;
-
   // 1. Lookup the table and verify if it exists
   TRACE("Looking up table");
+  scoped_refptr<TableInfo> table;
   RETURN_NOT_OK(FindTable(req->table(), &table));
   if (table == nullptr) {
     Status s = Status::NotFound("The table does not exist", req->table().DebugString());
@@ -1201,8 +1286,6 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
       CHECK_EQ(table_names_map_.erase(req->new_table_name()), 1);
     }
     CheckIfNoLongerLeaderAndSetupError(s, resp);
-    // TableMetadaLock follows RAII paradigm: when it leaves scope,
-    // 'l' will be unlocked, and the mutation will be aborted.
     return s;
   }
 
@@ -1411,14 +1494,14 @@ Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
   if (!tablet) {
     LOG(INFO) << "Got report from unknown tablet " << report.tablet_id()
               << ": Sending delete request for this orphan tablet";
-    SendDeleteTabletRequest(report.tablet_id(), TABLET_DATA_DELETED, boost::none, nullptr, ts_desc,
-                            "Report from unknown tablet");
+    SendDeleteReplicaRequest(report.tablet_id(), TABLET_DATA_DELETED, boost::none, nullptr, ts_desc,
+                             "Report from unknown tablet");
     return Status::OK();
   }
   if (!tablet->table()) {
     LOG(INFO) << "Got report from an orphaned tablet " << report.tablet_id();
-    SendDeleteTabletRequest(report.tablet_id(), TABLET_DATA_DELETED, boost::none, nullptr, ts_desc,
-                            "Report from an orphaned tablet");
+    SendDeleteReplicaRequest(report.tablet_id(), TABLET_DATA_DELETED, boost::none, nullptr, ts_desc,
+                             "Report from an orphaned tablet");
     return Status::OK();
   }
   VLOG(3) << "tablet report: " << report.ShortDebugString();
@@ -1430,9 +1513,6 @@ Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
 
   // If the TS is reporting a tablet which has been deleted, or a tablet from
   // a table which has been deleted, send it an RPC to delete it.
-  // NOTE: when a table is deleted, we don't currently iterate over all of the
-  // tablets and mark them as deleted. Hence, we have to check the table state,
-  // not just the tablet state.
   if (tablet_lock.data().is_deleted() ||
       table_lock.data().is_deleted()) {
     report_updates->set_state_msg(tablet_lock.data().pb.state_msg());
@@ -1441,9 +1521,8 @@ Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
               << " (" << msg << "): Sending delete request for this tablet";
     // TODO: Cancel tablet creation, instead of deleting, in cases where
     // that might be possible (tablet creation timeout & replacement).
-    SendDeleteTabletRequest(tablet->tablet_id(), TABLET_DATA_DELETED, boost::none,
-                            tablet->table(), ts_desc,
-                            Substitute("Tablet deleted: $0", msg));
+    SendDeleteReplicaRequest(tablet->tablet_id(), TABLET_DATA_DELETED, boost::none,
+                             tablet->table(), ts_desc, msg);
     return Status::OK();
   }
 
@@ -1505,11 +1584,11 @@ Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
     if (FLAGS_master_tombstone_evicted_tablet_replicas &&
         cstate.config().opid_index() < prev_cstate.config().opid_index() &&
         !IsRaftConfigMember(ts_desc->permanent_uuid(), prev_cstate.config())) {
-      SendDeleteTabletRequest(report.tablet_id(), TABLET_DATA_TOMBSTONED,
-                              prev_cstate.config().opid_index(), tablet->table(), ts_desc,
-                              Substitute("Replica from old config with index $0 (latest is $1)",
-                                         cstate.config().opid_index(),
-                                         prev_cstate.config().opid_index()));
+      SendDeleteReplicaRequest(report.tablet_id(), TABLET_DATA_TOMBSTONED,
+                               prev_cstate.config().opid_index(), tablet->table(), ts_desc,
+                               Substitute("Replica from old config with index $0 (latest is $1)",
+                                          cstate.config().opid_index(),
+                                          prev_cstate.config().opid_index()));
       return Status::OK();
     }
 
@@ -1611,6 +1690,7 @@ Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
   if (!s.ok()) {
     LOG(WARNING) << "Error updating tablets: " << s.ToString() << ". Tablet report was: "
                  << report.ShortDebugString();
+    // TODO: we should undo the in-memory tablet replica locations changes made above.
     return s;
   }
   tablet_lock.Commit();
@@ -1668,10 +1748,10 @@ Status CatalogManager::ResetTabletReplicasFromReportedConfig(
       if (!ContainsKey(current_member_uuids, peer_uuid)) {
         shared_ptr<TSDescriptor> ts_desc;
         if (!master_->ts_manager()->LookupTSByUUID(peer_uuid, &ts_desc)) continue;
-        SendDeleteTabletRequest(report.tablet_id(), TABLET_DATA_TOMBSTONED,
-                                prev_cstate.config().opid_index(), tablet->table(), ts_desc.get(),
-                                Substitute("TS $0 not found in new config with opid_index $1",
-                                           peer_uuid, cstate.config().opid_index()));
+        SendDeleteReplicaRequest(report.tablet_id(), TABLET_DATA_TOMBSTONED,
+                                 prev_cstate.config().opid_index(), tablet->table(), ts_desc.get(),
+                                 Substitute("TS $0 not found in new config with opid_index $1",
+                                            peer_uuid, cstate.config().opid_index()));
       }
     }
   }
@@ -2436,38 +2516,30 @@ void CatalogManager::SendAlterTabletRequest(const scoped_refptr<TabletInfo>& tab
   WARN_NOT_OK(call->Run(), "Failed to send alter table request");
 }
 
-void CatalogManager::DeleteTabletReplicas(
-    const TabletInfo* tablet,
-    const std::string& msg) {
-  TabletInfo::ReplicaMap locations;
-  tablet->GetReplicaLocations(&locations);
-  LOG(INFO) << "Sending DeleteTablet for " << locations.size()
-            << " replicas of tablet " << tablet->tablet_id();
-  for (const TabletInfo::ReplicaMap::value_type& r : locations) {
-    SendDeleteTabletRequest(tablet->tablet_id(), TABLET_DATA_DELETED,
-                            boost::none, tablet->table(), r.second.ts_desc, msg);
-  }
-}
-
-void CatalogManager::DeleteTabletsAndSendRequests(const scoped_refptr<TableInfo>& table) {
+void CatalogManager::SendDeleteTableRequest(const scoped_refptr<TableInfo>& table,
+                                            const string& deletion_msg) {
   vector<scoped_refptr<TabletInfo> > tablets;
   table->GetAllTablets(&tablets);
 
-  string deletion_msg = "Table deleted at " + LocalTimeAsString();
-
   for (const scoped_refptr<TabletInfo>& tablet : tablets) {
-    DeleteTabletReplicas(tablet.get(), deletion_msg);
+    SendDeleteTabletRequest(tablet, deletion_msg);
+  }
+}
 
-    TabletMetadataLock tablet_lock(tablet.get(), TabletMetadataLock::WRITE);
-    tablet_lock.mutable_data()->set_state(SysTabletsEntryPB::DELETED, deletion_msg);
-    SysCatalogTable::Actions actions;
-    actions.tablets_to_update.push_back(tablet.get());
-    CHECK_OK(sys_catalog_->Write(actions));
-    tablet_lock.Commit();
+void CatalogManager::SendDeleteTabletRequest(const scoped_refptr<TabletInfo>& tablet,
+                                             const string& deletion_msg) {
+  TabletInfo::ReplicaMap locations;
+  tablet->GetReplicaLocations(&locations);
+
+  LOG(INFO) << "Sending DeleteTablet for " << locations.size()
+            << " replicas of tablet " << tablet->tablet_id();
+  for (const TabletInfo::ReplicaMap::value_type& r : locations) {
+    SendDeleteReplicaRequest(tablet->tablet_id(), TABLET_DATA_DELETED,
+                             boost::none, tablet->table(), r.second.ts_desc, deletion_msg);
   }
 }
 
-void CatalogManager::SendDeleteTabletRequest(
+void CatalogManager::SendDeleteReplicaRequest(
     const std::string& tablet_id,
     TabletDataState delete_type,
     const boost::optional<int64_t>& cas_config_opid_index_less_or_equal,
@@ -2505,8 +2577,7 @@ void CatalogManager::SendAddServerRequest(const scoped_refptr<TabletInfo>& table
 }
 
 void CatalogManager::ExtractTabletsToProcess(
-    std::vector<scoped_refptr<TabletInfo> > *tablets_to_delete,
-    std::vector<scoped_refptr<TabletInfo> > *tablets_to_process) {
+    vector<scoped_refptr<TabletInfo>>* tablets_to_process) {
 
   boost::shared_lock<LockType> l(lock_);
 
@@ -2515,31 +2586,25 @@ void CatalogManager::ExtractTabletsToProcess(
   //       or just a counter to avoid to take the lock and loop through the tablets
   //       if everything is "stable".
 
-  for (const TabletInfoMap::value_type& entry : tablet_map_) {
-    scoped_refptr<TabletInfo> tablet = entry.second;
-    TabletMetadataLock tablet_lock(tablet.get(), TabletMetadataLock::READ);
-
-    if (!tablet->table()) {
-      // Tablet is orphaned or in preparing state, continue.
-      continue;
-    }
-
-    TableMetadataLock table_lock(tablet->table().get(), TableMetadataLock::READ);
-
-    // If the table is deleted or the tablet was replaced at table creation time.
-    if (tablet_lock.data().is_deleted() || table_lock.data().is_deleted()) {
-      tablets_to_delete->push_back(tablet);
+  // 'tablets_to_process' elements must be partially ordered in the same way as
+  // table->GetAllTablets(); see the locking rules at the top of the file.
+  for (const auto& table_entry : table_ids_map_) {
+    scoped_refptr<TableInfo> table = table_entry.second;
+    TableMetadataLock table_lock(table.get(), TableMetadataLock::READ);
+    if (table_lock.data().is_deleted()) {
       continue;
     }
 
-    // Running tablets.
-    if (tablet_lock.data().is_running()) {
-      // TODO: handle last update > not responding timeout?
-      continue;
+    vector<scoped_refptr<TabletInfo>> tablets;
+    table->GetAllTablets(&tablets);
+    for (const auto& tablet : tablets) {
+      TabletMetadataLock tablet_lock(tablet.get(), TabletMetadataLock::READ);
+      if (tablet_lock.data().is_deleted() ||
+          tablet_lock.data().is_running()) {
+        continue;
+      }
+      tablets_to_process->push_back(tablet);
     }
-
-    // Tablets not yet assigned or with a report just received
-    tablets_to_process->push_back(tablet);
   }
 }
 
@@ -2649,56 +2714,20 @@ Status CatalogManager::HandleTabletSchemaVersionReport(TabletInfo *tablet, uint3
   return Status::OK();
 }
 
-// Helper class to commit TabletInfo mutations at the end of a scope.
-namespace {
-
-class ScopedTabletInfoCommitter {
- public:
-  explicit ScopedTabletInfoCommitter(const std::vector<scoped_refptr<TabletInfo> >* tablets)
-    : tablets_(DCHECK_NOTNULL(tablets)),
-      aborted_(false) {
-  }
-
-  // This method is not thread safe. Must be called by the same thread
-  // that would destroy this instance.
-  void Abort() {
-    for (const scoped_refptr<TabletInfo>& tablet : *tablets_) {
-      tablet->mutable_metadata()->AbortMutation();
-    }
-    aborted_ = true;
-  }
-
-  // Commit the transactions.
-  ~ScopedTabletInfoCommitter() {
-    if (PREDICT_TRUE(!aborted_)) {
-      for (const scoped_refptr<TabletInfo>& tablet : *tablets_) {
-        tablet->mutable_metadata()->CommitMutation();
-      }
-    }
-  }
-
- private:
-  const std::vector<scoped_refptr<TabletInfo> >* tablets_;
-  bool aborted_;
-};
-} // anonymous namespace
-
 Status CatalogManager::ProcessPendingAssignments(
     const std::vector<scoped_refptr<TabletInfo> >& tablets) {
   VLOG(1) << "Processing pending assignments";
 
   // Take write locks on all tablets to be processed, and ensure that they are
   // unlocked at the end of this scope.
-  for (const scoped_refptr<TabletInfo>& tablet : tablets) {
-    tablet->mutable_metadata()->StartMutation();
-  }
-  ScopedTabletInfoCommitter unlocker_in(&tablets);
+  ScopedTabletInfoCommitter unlocker_in(ScopedTabletInfoCommitter::UNLOCKED);
+  unlocker_in.AddTablets(tablets);
+  unlocker_in.LockTabletsForWriting();
 
   // Any tablets created by the helper functions will also be created in a
   // locked state, so we must ensure they are unlocked before we return to
   // avoid deadlocks.
-  std::vector<scoped_refptr<TabletInfo> > new_tablets;
-  ScopedTabletInfoCommitter unlocker_out(&new_tablets);
+  ScopedTabletInfoCommitter unlocker_out(ScopedTabletInfoCommitter::LOCKED);
 
   DeferredAssignmentActions deferred;
 
@@ -2714,9 +2743,12 @@ Status CatalogManager::ProcessPendingAssignments(
         break;
 
       case SysTabletsEntryPB::CREATING:
+      {
+        vector<scoped_refptr<TabletInfo>> new_tablets;
         HandleAssignCreatingTablet(tablet.get(), &deferred, &new_tablets);
+        unlocker_out.AddTablets(new_tablets);
         break;
-
+      }
       default:
         VLOG(2) << "Nothing to do for tablet " << tablet->tablet_id() << ": state = "
                 << SysTabletsEntryPB_State_Name(t_state);
@@ -2765,7 +2797,7 @@ Status CatalogManager::ProcessPendingAssignments(
     // If there was an error, abort any mutations started by the
     // current task.
     vector<string> tablet_ids_to_remove;
-    for (scoped_refptr<TabletInfo>& new_tablet : new_tablets) {
+    for (const auto& new_tablet : unlocker_out) {
       TableInfo* table = new_tablet->table().get();
       TableMetadataLock l_table(table, TableMetadataLock::READ);
       if (table->RemoveTablet(
@@ -2787,9 +2819,9 @@ Status CatalogManager::ProcessPendingAssignments(
 
   // Send DeleteTablet requests to tablet servers serving deleted tablets.
   // This is asynchronous / non-blocking.
-  for (const TabletInfo* tablet : deferred.tablets_to_update) {
+  for (TabletInfo* tablet : deferred.tablets_to_update) {
     if (tablet->metadata().dirty().is_deleted()) {
-      DeleteTabletReplicas(tablet, tablet->metadata().dirty().pb.state_msg());
+      SendDeleteTabletRequest(tablet, tablet->metadata().dirty().pb.state_msg());
     }
   }
   // Send the CreateTablet() requests to the servers. This is asynchronous / non-blocking.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/af8a4af0/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index ce56c97..a7eb896 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -510,10 +510,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
 
   void NewReplica(TSDescriptor* ts_desc, const ReportedTabletPB& report, TabletReplica* replica);
 
-  // Extract the set of tablets that can be deleted and the set of tablets
-  // that must be processed because not running yet.
-  void ExtractTabletsToProcess(std::vector<scoped_refptr<TabletInfo> > *tablets_to_delete,
-                               std::vector<scoped_refptr<TabletInfo> > *tablets_to_process);
+  // Extract the set of tablets that must be processed because not running yet.
+  void ExtractTabletsToProcess(std::vector<scoped_refptr<TabletInfo>>* tablets_to_process);
 
   // Task that takes care of the tablet assignments/creations.
   // Loops through the "not created" tablets and sends a CreateTablet() request.
@@ -578,21 +576,23 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // tablet.
   void SendAlterTabletRequest(const scoped_refptr<TabletInfo>& tablet);
 
-  // Request tablet servers to delete all replicas of the tablet.
-  void DeleteTabletReplicas(const TabletInfo* tablet, const std::string& msg);
-
-  // Marks each of the tablets in the given table as deleted and triggers requests
-  // to the tablet servers to delete them.
-  void DeleteTabletsAndSendRequests(const scoped_refptr<TableInfo>& table);
-
-  // Send the "delete tablet request" to the specified TS/tablet.
-  // The specified 'reason' will be logged on the TS.
-  void SendDeleteTabletRequest(const std::string& tablet_id,
-                               tablet::TabletDataState delete_type,
-                               const boost::optional<int64_t>& cas_config_opid_index_less_or_equal,
-                               const scoped_refptr<TableInfo>& table,
-                               TSDescriptor* ts_desc,
-                               const std::string& reason);
+  // Send the "delete tablet request" to all replicas of all tablets of the
+  // specified table.
+  void SendDeleteTableRequest(const scoped_refptr<TableInfo>& table,
+                              const std::string& deletion_msg);
+
+  // Send the "delete tablet request" to all replicas of the specified tablet.
+  void SendDeleteTabletRequest(const scoped_refptr<TabletInfo>& tablet,
+                               const std::string& deletion_msg);
+
+  // Send the "delete tablet request" to a particular replica (i.e. TS and
+  // tablet combination). The specified 'reason' will be logged on the TS.
+  void SendDeleteReplicaRequest(const std::string& tablet_id,
+                                tablet::TabletDataState delete_type,
+                                const boost::optional<int64_t>& cas_config_opid_index_less_or_equal,
+                                const scoped_refptr<TableInfo>& table,
+                                TSDescriptor* ts_desc,
+                                const std::string& reason);
 
   // Start a task to change the config to add an additional voter because the
   // specified tablet is under-replicated.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/af8a4af0/src/kudu/master/master-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 01684d4..af5c986 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -41,8 +41,10 @@ using kudu::rpc::MessengerBuilder;
 using kudu::rpc::RpcController;
 using std::shared_ptr;
 using std::string;
+using strings::Substitute;
 
 DECLARE_bool(catalog_manager_check_ts_count_for_create_table);
+DECLARE_double(sys_catalog_fail_during_write);
 
 namespace kudu {
 namespace master {
@@ -539,5 +541,310 @@ TEST_F(MasterTest, TestGetTableSchemaIsAtomicWithCreateTable) {
   t->Join();
 }
 
+// Verifies that on-disk master metadata is self-consistent and matches a set
+// of expected contents.
+//
+// Sample usage:
+//
+//   MasterMetadataVerifier v(live, dead);
+//   sys_catalog->VisitTables(&v);
+//   sys_catalog->VisitTablets(&v);
+//   ASSERT_OK(v.Verify());
+//
+class MasterMetadataVerifier : public TableVisitor,
+                               public TabletVisitor {
+ public:
+  MasterMetadataVerifier(const unordered_set<string>& live_table_names,
+                         const multiset<string>& dead_table_names)
+    : live_table_names_(live_table_names),
+      dead_table_names_(dead_table_names) {
+  }
+
+  virtual Status VisitTable(const std::string& table_id,
+                             const SysTablesEntryPB& metadata) OVERRIDE {
+     InsertOrDie(&visited_tables_by_id_, table_id,
+                 { table_id, metadata.name(), metadata.state() });
+     return Status::OK();
+   }
+
+  virtual Status VisitTablet(const std::string& table_id,
+                             const std::string& tablet_id,
+                             const SysTabletsEntryPB& metadata) OVERRIDE {
+    InsertOrDie(&visited_tablets_by_id_, tablet_id,
+                { tablet_id, table_id, metadata.state() });
+    return Status::OK();
+  }
+
+  Status Verify() {
+    RETURN_NOT_OK(VerifyTables());
+    RETURN_NOT_OK(VerifyTablets());
+    return Status::OK();
+  }
+
+ private:
+  Status VerifyTables() {
+    unordered_set<string> live_visited_table_names;
+    multiset<string> dead_visited_table_names;
+
+    for (const auto& entry : visited_tables_by_id_) {
+      const Table& table = entry.second;
+      switch (table.state) {
+        case SysTablesEntryPB::RUNNING:
+        case SysTablesEntryPB::ALTERING:
+          InsertOrDie(&live_visited_table_names, table.name);
+          break;
+        case SysTablesEntryPB::REMOVED:
+          // InsertOrDie() doesn't work on multisets, where the returned
+          // element is not an std::pair.
+          dead_visited_table_names.insert(table.name);
+          break;
+        default:
+          return Status::Corruption(
+              Substitute("Table $0 has unexpected state $1",
+                         table.id,
+                         SysTablesEntryPB::State_Name(table.state)));
+      }
+    }
+
+    if (live_visited_table_names != live_table_names_) {
+      return Status::Corruption("Live table name set mismatch");
+    }
+
+    if (dead_visited_table_names != dead_table_names_) {
+      return Status::Corruption("Dead table name set mismatch");
+    }
+    return Status::OK();
+  }
+
+  Status VerifyTablets() {
+    // Each table should be referenced by exactly this number of tablets.
+    const int kNumExpectedReferences = 3;
+
+    // Build table ID --> table map for use in verification below.
+    unordered_map<string, const Table*> tables_by_id;
+    for (const auto& entry : visited_tables_by_id_) {
+      InsertOrDie(&tables_by_id, entry.second.id, &entry.second);
+    }
+
+    map<string, int> referenced_tables;
+    for (const auto& entry : visited_tablets_by_id_) {
+      const Tablet& tablet = entry.second;
+      switch (tablet.state) {
+        case SysTabletsEntryPB::PREPARING:
+        case SysTabletsEntryPB::CREATING:
+        case SysTabletsEntryPB::DELETED:
+        {
+          const Table* table = FindPtrOrNull(tables_by_id, tablet.table_id);
+          if (!table) {
+            return Status::Corruption(Substitute(
+                "Tablet $0 belongs to non-existent table $1",
+                tablet.id, tablet.table_id));
+          }
+          string table_state_str = SysTablesEntryPB_State_Name(table->state);
+          string tablet_state_str = SysTabletsEntryPB_State_Name(tablet.state);
+
+          // PREPARING or CREATING tablets must be members of RUNNING or
+          // ALTERING tables.
+          //
+          // DELETED tablets must be members of REMOVED tables.
+          if (((tablet.state == SysTabletsEntryPB::PREPARING ||
+                tablet.state == SysTabletsEntryPB::CREATING) &&
+               (table->state != SysTablesEntryPB::RUNNING &&
+                table->state != SysTablesEntryPB::ALTERING)) ||
+              (tablet.state == SysTabletsEntryPB::DELETED &&
+               table->state != SysTablesEntryPB::REMOVED)) {
+            return Status::Corruption(
+                Substitute("Unexpected states: table $0=$1, tablet $2=$3",
+                           table->id, table_state_str,
+                           tablet.id, tablet_state_str));
+          }
+
+          referenced_tables[tablet.table_id]++;
+          break;
+        }
+        default:
+          return Status::Corruption(
+              Substitute("Tablet $0 has unexpected state $1",
+                         tablet.id,
+                         SysTabletsEntryPB::State_Name(tablet.state)));
+      }
+    }
+
+    for (const auto& entry : referenced_tables) {
+      if (entry.second != kNumExpectedReferences) {
+        return Status::Corruption(
+            Substitute("Table $0 has bad reference count ($1 instead of $2)",
+                       entry.first, entry.second, kNumExpectedReferences));
+      }
+    }
+    return Status::OK();
+  }
+
+  // Names of tables that are thought to be created and never deleted.
+  const unordered_set<string> live_table_names_;
+
+  // Names of tables that are thought to be deleted. A table with a given name
+  // could be deleted more than once.
+  const multiset<string> dead_table_names_;
+
+  // Table ID to table map populated during VisitTables().
+  struct Table {
+    string id;
+    string name;
+    SysTablesEntryPB::State state;
+  };
+  unordered_map<string, Table> visited_tables_by_id_;
+
+  // Tablet ID to tablet map populated during VisitTablets().
+  struct Tablet {
+    string id;
+    string table_id;
+    SysTabletsEntryPB::State state;
+  };
+  unordered_map<string, Tablet> visited_tablets_by_id_;
+};
+
+TEST_F(MasterTest, TestMasterMetadataConsistentDespiteFailures) {
+  const Schema kTableSchema({ ColumnSchema("key", INT32),
+                              ColumnSchema("v1", UINT64),
+                              ColumnSchema("v2", STRING) },
+                            1);
+
+  // When generating random table names, we use a uniform distribution so
+  // as to generate the occasional name collision; the test should cope.
+  const int kUniformBound = 25;
+
+  // Ensure some portion of the attempted operations fail.
+  FLAGS_sys_catalog_fail_during_write = 0.25;
+  int num_injected_failures = 0;
+
+  // Tracks all "live" tables (i.e. created and not yet deleted).
+  vector<string> table_names;
+
+  // Tracks all deleted tables. A given name may have been deleted more
+  // than once.
+  multiset<string> deleted_table_names;
+  Random r(SeedRandom());
+
+  // Spend some time hammering the master with create/alter/delete operations.
+  MonoDelta time_to_run = MonoDelta::FromSeconds(AllowSlowTests() ? 10 : 1);
+  MonoTime deadline = MonoTime::Now(MonoTime::FINE);
+  deadline.AddDelta(time_to_run);
+  while (MonoTime::Now(MonoTime::FINE).ComesBefore(deadline)) {
+    int next_action = r.Uniform(3);
+    switch (next_action) {
+      case 0:
+      {
+        // Create a new table with a random name and three tablets.
+        //
+        // No name collision checking, so this table may already exist.
+        CreateTableRequestPB req;
+        CreateTableResponsePB resp;
+        RpcController controller;
+
+        req.set_name(Substitute("table-$0", r.Uniform(kUniformBound)));
+        ASSERT_OK(SchemaToPB(kTableSchema, req.mutable_schema()));
+        RowOperationsPBEncoder encoder(req.mutable_split_rows());
+        KuduPartialRow row(&kTableSchema);
+        ASSERT_OK(row.SetInt32("key", 10));
+        encoder.Add(RowOperationsPB::SPLIT_ROW, row);
+        ASSERT_OK(row.SetInt32("key", 20));
+        encoder.Add(RowOperationsPB::SPLIT_ROW, row);
+
+        ASSERT_OK(proxy_->CreateTable(req, &resp, &controller));
+        if (resp.has_error()) {
+          Status s = StatusFromPB(resp.error().status());
+          ASSERT_TRUE(s.IsAlreadyPresent() || s.IsRuntimeError()) << s.ToString();
+          if (s.IsRuntimeError()) {
+            ASSERT_STR_CONTAINS(s.ToString(),
+                                SysCatalogTable::kInjectedFailureStatusMsg);
+            num_injected_failures++;
+          }
+        } else {
+          table_names.push_back(req.name());
+        }
+        break;
+      }
+      case 1:
+      {
+        // Rename a random table to some random name.
+        //
+        // No name collision checking, so the new table name may already exist.
+        int num_tables = table_names.size();
+        if (num_tables == 0) {
+          break;
+        }
+        int table_idx = r.Uniform(num_tables);
+        AlterTableRequestPB req;
+        AlterTableResponsePB resp;
+        RpcController controller;
+
+        req.mutable_table()->set_table_name(table_names[table_idx]);
+        req.set_new_table_name(Substitute("table-$0", r.Uniform(kUniformBound)));
+        ASSERT_OK(proxy_->AlterTable(req, &resp, &controller));
+        if (resp.has_error()) {
+          Status s = StatusFromPB(resp.error().status());
+          ASSERT_TRUE(s.IsAlreadyPresent() || s.IsRuntimeError()) << s.ToString();
+          if (s.IsRuntimeError()) {
+            ASSERT_STR_CONTAINS(s.ToString(),
+                                SysCatalogTable::kInjectedFailureStatusMsg);
+            num_injected_failures++;
+          }
+        } else {
+          table_names[table_idx] = req.new_table_name();
+        }
+        break;
+      }
+      case 2:
+      {
+        // Delete a random table.
+        int num_tables = table_names.size();
+        if (num_tables == 0) {
+          break;
+        }
+        int table_idx = r.Uniform(num_tables);
+        DeleteTableRequestPB req;
+        DeleteTableResponsePB resp;
+        RpcController controller;
+
+        req.mutable_table()->set_table_name(table_names[table_idx]);
+        ASSERT_OK(proxy_->DeleteTable(req, &resp, &controller));
+        if (resp.has_error()) {
+          Status s = StatusFromPB(resp.error().status());
+          ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+          ASSERT_STR_CONTAINS(s.ToString(),
+                              SysCatalogTable::kInjectedFailureStatusMsg);
+          num_injected_failures++;
+        } else {
+          deleted_table_names.insert(table_names[table_idx]);
+          table_names[table_idx] = table_names.back();
+          table_names.pop_back();
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "Cannot reach here!";
+    }
+  }
+
+  // Injected failures are random, but given the number of operations we did,
+  // we should expect to have seen at least one.
+  ASSERT_GE(num_injected_failures, 1);
+
+  // Restart the catalog manager to ensure that it can survive reloading the
+  // metadata we wrote to disk.
+  ASSERT_OK(mini_master_->Restart());
+
+  // Reload the metadata again, this time verifying its consistency.
+  unordered_set<string> live_table_names(table_names.begin(),
+                                         table_names.end());
+  MasterMetadataVerifier verifier(live_table_names, deleted_table_names);
+  SysCatalogTable* sys_catalog =
+      mini_master_->master()->catalog_manager()->sys_catalog();
+  ASSERT_OK(sys_catalog->VisitTables(&verifier));
+  ASSERT_OK(sys_catalog->VisitTablets(&verifier));
+  ASSERT_OK(verifier.Verify());
+}
+
 } // namespace master
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/af8a4af0/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 553b728..9e5b9b6 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -41,10 +41,16 @@
 #include "kudu/tablet/transactions/write_transaction.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/debug/trace_event.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/threadpool.h"
 
+DEFINE_double(sys_catalog_fail_during_write, 0.0,
+              "Fraction of the time when system table writes will fail");
+TAG_FLAG(sys_catalog_fail_during_write, unsafe);
+
 using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
 using kudu::consensus::ConsensusMetadata;
 using kudu::consensus::RaftConfigPB;
@@ -68,6 +74,8 @@ static const char* const kSysCatalogTableColType = "entry_type";
 static const char* const kSysCatalogTableColId = "entry_id";
 static const char* const kSysCatalogTableColMetadata = "metadata";
 
+const char* SysCatalogTable::kInjectedFailureStatusMsg = "INJECTED FAILURE";
+
 SysCatalogTable::SysCatalogTable(Master* master, MetricRegistry* metrics,
                                  ElectedLeaderCallback leader_cb)
     : metric_registry_(metrics),
@@ -312,6 +320,9 @@ Status SysCatalogTable::WaitUntilRunning() {
 }
 
 Status SysCatalogTable::SyncWrite(const WriteRequestPB *req, WriteResponsePB *resp) {
+  MAYBE_RETURN_FAILURE(FLAGS_sys_catalog_fail_during_write,
+                       Status::RuntimeError(kInjectedFailureStatusMsg));
+
   CountDownLatch latch(1);
   gscoped_ptr<tablet::TransactionCompletionCallback> txn_callback(
     new LatchTransactionCompletionCallback<WriteResponsePB>(&latch, resp));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/af8a4af0/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index 6f563e6..7a4878b 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -113,6 +113,7 @@ class SysCatalogTable {
   Status VisitTablets(TabletVisitor* visitor);
 
  private:
+  FRIEND_TEST(MasterTest, TestMasterMetadataConsistentDespiteFailures);
   DISALLOW_COPY_AND_ASSIGN(SysCatalogTable);
 
   friend class CatalogManager;
@@ -188,6 +189,11 @@ class SysCatalogTable {
   Status ReqDeleteTablets(tserver::WriteRequestPB* req,
                           const std::vector<TabletInfo*>& tablets);
 
+  // Special string injected into SyncWrite() random failures (if enabled).
+  //
+  // Only useful for tests.
+  static const char* kInjectedFailureStatusMsg;
+
   // Table schema, without IDs, used to send messages to the TabletPeer
   Schema schema_;
   Schema key_schema_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/af8a4af0/src/kudu/util/fault_injection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/fault_injection.cc b/src/kudu/util/fault_injection.cc
index ffa6cb2..9f331b4 100644
--- a/src/kudu/util/fault_injection.cc
+++ b/src/kudu/util/fault_injection.cc
@@ -79,5 +79,14 @@ void DoInjectRandomLatency(double max_ms) {
   SleepFor(MonoDelta::FromMilliseconds(g_random->NextDoubleFraction() * max_ms));
 }
 
+Status DoMaybeReturnFailure(double fraction,
+                            const Status& bad_status_to_return) {
+  GoogleOnceInit(&g_random_once, InitRandom);
+  if (PREDICT_TRUE(g_random->NextDoubleFraction() >= fraction)) {
+    return Status::OK();
+  }
+  return bad_status_to_return;
+}
+
 } // namespace fault_injection
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/af8a4af0/src/kudu/util/fault_injection.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/fault_injection.h b/src/kudu/util/fault_injection.h
index eba021b..462908b 100644
--- a/src/kudu/util/fault_injection.h
+++ b/src/kudu/util/fault_injection.h
@@ -18,6 +18,11 @@
 #define KUDU_UTIL_FAULT_INJECTION_H
 
 #include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+// Macros for injecting various kinds of faults with varying probability. If
+// configured with 0 probability, each of these macros is evaluated inline and
+// is fast enough to run even in hot code paths.
 
 // With some probability, crash at the current point in the code
 // by issuing LOG(FATAL).
@@ -29,18 +34,22 @@
 //   DEFINE_double(fault_crash_before_foo, 0.0,
 //                 "Fraction of the time when we will crash before doing foo");
 //   TAG_FLAG(fault_crash_before_foo, unsafe);
-//
-// This macro should be fast enough to run even in hot code paths.
 #define MAYBE_FAULT(fraction_flag) \
   kudu::fault_injection::MaybeFault(AS_STRING(fraction_flag), fraction_flag)
 
 // Inject a uniformly random amount of latency between 0 and the configured
 // number of milliseconds.
-//
-// As with above, if the flag is configured to be <= 0, then this will be evaluated
-// inline and should be fast, even in hot code path.
 #define MAYBE_INJECT_RANDOM_LATENCY(max_ms_flag) \
-  kudu::fault_injection::MaybeInjectRandomLatency(max_ms_flag);
+  kudu::fault_injection::MaybeInjectRandomLatency(max_ms_flag)
+
+// With some probability, return the failure described by 'status_expr'.
+//
+// Unlike the other MAYBE_ macros, this one does not chain to an inline
+// function so that 'status_expr' isn't evaluated unless 'fraction_flag'
+// really is non-zero.
+#define MAYBE_RETURN_FAILURE(fraction_flag, status_expr) \
+  static const Status status_eval = (status_expr); \
+  RETURN_NOT_OK(kudu::fault_injection::MaybeReturnFailure(fraction_flag, status_eval));
 
 // Implementation details below.
 // Use the MAYBE_FAULT macro instead.
@@ -50,6 +59,8 @@ namespace fault_injection {
 // Out-of-line implementation.
 void DoMaybeFault(const char* fault_str, double fraction);
 void DoInjectRandomLatency(double max_latency);
+Status DoMaybeReturnFailure(double fraction,
+                            const Status& bad_status_to_return);
 
 inline void MaybeFault(const char* fault_str, double fraction) {
   if (PREDICT_TRUE(fraction <= 0)) return;
@@ -61,6 +72,12 @@ inline void MaybeInjectRandomLatency(double max_latency) {
   DoInjectRandomLatency(max_latency);
 }
 
+inline Status MaybeReturnFailure(double fraction,
+                                 const Status& bad_status_to_return) {
+  if (PREDICT_TRUE(fraction <= 0)) return Status::OK();
+  return DoMaybeReturnFailure(fraction, bad_status_to_return);
+}
+
 } // namespace fault_injection
 } // namespace kudu
 #endif /* KUDU_UTIL_FAULT_INJECTION_H */


[3/3] incubator-kudu git commit: design-docs: multi-master for 1.0 release

Posted by ad...@apache.org.
design-docs: multi-master for 1.0 release

Here's the design doc I wrote for multi-master. It describes where we are
now and what we need to do to feel comfortable with the feature in Kudu 1.0.
Notably, client-related discussion is omitted. That's because I'm expecting
to fix it in the next release, so I haven't thought about it much. For now
I'm focusing on the master-side of the feature.

Change-Id: Iad76012977a45370b72a04d608371cecf90442ef
Reviewed-on: http://gerrit.cloudera.org:8080/2527
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/73b8cc07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/73b8cc07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/73b8cc07

Branch: refs/heads/master
Commit: 73b8cc07ec0fda38a2627d5b7de70b07b7943c1e
Parents: 6e7a04a
Author: Adar Dembo <ad...@cloudera.com>
Authored: Thu Mar 10 20:17:17 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Apr 27 01:09:52 2016 +0000

----------------------------------------------------------------------
 docs/design-docs/README.md           |   1 +
 docs/design-docs/multi-master-1.0.md | 614 ++++++++++++++++++++++++++++++
 2 files changed, 615 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/73b8cc07/docs/design-docs/README.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/README.md b/docs/design-docs/README.md
index 24566e6..9475641 100644
--- a/docs/design-docs/README.md
+++ b/docs/design-docs/README.md
@@ -35,3 +35,4 @@ made.
 | [C++ client design and impl. details](cpp-client.md) | Client | N/A |
 | [(old) Heartbeating between tservers and multiple masters](old-multi-master-heartbeating.md) | Master | [gerrit](http://gerrit.cloudera.org:8080/2495) |
 | [Scan Token API](scan-tokens.md) | Client | [gerrit](http://gerrit.cloudera.org:8080/2443) |
+| [Full multi-master support for Kudu 1.0](multi-master-1.0.md) | Master, Client | [gerrit](http://gerrit.cloudera.org:8080/2527) |

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/73b8cc07/docs/design-docs/multi-master-1.0.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/multi-master-1.0.md b/docs/design-docs/multi-master-1.0.md
new file mode 100644
index 0000000..11101dc
--- /dev/null
+++ b/docs/design-docs/multi-master-1.0.md
@@ -0,0 +1,614 @@
+<!---
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Multi-master support for Kudu 1.0
+
+## Background
+
+Kudu's design avoids a single point of failure via multiple Kudu masters.
+Just as with tablets, master metadata is persisted to disk and replicated
+via Raft consensus, and so a deployment of **2N+1** masters can tolerate up
+to **N** failures.
+
+By the time Kudu's first beta launched, support for multiple masters had
+been implemented but was too fragile to be anything but experimental. The
+rest of this document describes the gaps that must be filled before
+multi-master support is ready for production, and lays out a plan for how
+to fill them.
+
+## Gaps in the master
+
+### Current design
+
+At startup, a master Raft configuration will elect a leader master. The
+leader master is responsible for servicing both tserver heartbeats as well
+as client requests. The follower masters participate in Raft consensus and
+replicate metadata, but are otherwise idle. Any heartbeats or client
+requests they receive are rejected.
+
+All persistent master metadata is stored in a single replicated tablet.
+Every row in this tablet represents either a table or a tablet. Table
+records include unique table identifiers, the table's schema, and other bits
+of information. Tablet records include a unique identifier, the tablet's
+Raft configuration, and other information.
+
+What master metadata is replicated?
+
+1. Table and tablet existence, via **CreateTable()** and **DeleteTable()**.
+   Every new tablet record also includes an initial Raft configuration.
+2. Schema changes, via **AlterTable()** and tserver heartbeats.
+3. Tablet server Raft configuration changes, via tserver heartbeats. These
+   include both the list of peers (may have changed due to
+   under-replication) as well as the current leader (may have changed due to
+   an election).
+
+Scanning the master tablet for every heartbeat or request would be slow,
+so the leader master caches all master metadata in memory. The caches are
+only updated after a metadata change is successfully replicated; in this
+way they are always consistent with the on-disk tablet. When a new leader
+master is elected, it scans the entire master tablet and uses the metadata
+to rebuild its in-memory caches.
+
+To understand how the in-memory caches work, let's start with the different
+kinds of information that are cached:
+
+1. Tablet server instance. Is uniquely identified by the tserver's UUID and
+   includes general metadata about the tserver, how recently this master
+   received a heartbeat from the tserver, and proxy objects that the master
+   uses to communicate with the tserver.
+2. Table instance. Is uniquely identified by the table's ID and includes
+   general metadata about the table.
+3. Tablet instance. Is uniquely identified by the tablet's ID and includes
+   general metadata about the tablet along with its current replicas.
+
+Now, let's describe the various data structures that store this information:
+
+1. Global: map of tserver UUID to tserver instance.
+2. Global: map of table ID to table instance. All tables are present, even
+   deleted tables.
+3. Global: map of table name to table instance. Deleted tables are omitted,
+   otherwise a new table could collide with a deleted one.
+4. Global: map of tablet ID to tablet instance. All tablets are present,
+   even deleted ones.
+5. Per-table instance: map of tablet ID to tablet instance. Deleted tablets
+   are retained here but are not inserted the next time master metadata is
+   reloaded from disk.
+6. Per-tablet instance: map of tserver UUID to tablet replica.
+
+With few exceptions (detailed below), these caches behave more or less as
+one would expect. For example, a successful **CreateTable()** yields a
+table, tablet, and replica instances, all of which are mapped accordingly. A
+heartbeat from a tserver may update a tablet's replica map if that tablet
+elected a new leader replica. And so on.
+
+All tservers start up with location information for the entire master Raft
+configuration, but are only responsible for heartbeating to the leader
+master. Prior to the first heartbeat, a tserver must determine which of the
+masters is the leader master. After that, the tserver will send heartbeats
+to that master until such a time that it fails or steps down, at which point
+the tserver must determine the new leader master and the cycle
+repeats. Follower masters ignore all heartbeats.
+
+The information communicated in a tserver's heartbeat varies. Normally the
+enclosed tablet report is "incremental" in that it only includes tablets
+that have undergone Raft configuration or role changes. In rarer
+circumstances the tablet report will be "full" and include every
+tablet. Importantly, incremental reports are "edge" triggered; that is,
+after a tablet is included in an incremental report **N**, it is omitted
+from incremental report **N+1**. Full tablet reports are sent when one of
+the following conditions is met:
+
+1. The master requests it in the previous heartbeat response, because the
+   master is seeing this tserver for the first time.
+2. The tserver has just started.
+
+Clients behave much like tservers: they are configured a priori with the
+entire master Raft configuration, must always communicate with the leader
+master, and will retry their requests until the new leader master is found.
+
+### Known issues
+
+#### KUDU-1358: **CreateTable()** may fail following a master election
+
+One of the aforementioned in-memory caches keeps track of all known tservers
+and their "liveness" (i.e. how likely they are to be alive). This cache is
+*NOT* rebuilt using persistent master metadata; instead, it is updated
+whenever an unknown tserver heartbeats to the leader master.
+**CreateTable()** requests use this cache to determine whether a new table
+can be satisfied using the current cluster size; if not, the request is
+rejected.
+
+Right after a master election, the new leader master may have a cold tserver
+cache if it's never seen any heartbeats before. Until an entire heartbeat
+interval has elapsed, this cold cache may harm **CreateTable()** requests:
+
+1. If insufficient tservers are known to the master, the request will fail.
+2. If not all tservers are known to the master, the cluster may become
+   unbalanced as tablets pile up on a particular subset of tservers.
+
+#### KUDU-1353: **AlterTable()** may get stuck
+
+Another in-memory cache that is *NOT* rebuilt using persistent master
+metadata is the per-tablet replica map. These maps are used to satisfy
+client **GetTableLocations()** and **GetTabletLocations()**
+requests. They're also used to determine the leader replica for various
+master to tserver RPC requests, such as in response to a client's
+**AlterTable()**. Each of these maps is updated during a tserver heartbeat
+using a tablet's latest Raft configuration.
+
+If a new leader master is elected and a tserver is already known to it
+(perhaps because the master had been leader before), every heartbeat it
+receives from that tserver will include an empty tablet report. This is
+problematic for the per-tablet replica maps, which will remain empty until
+tablets show up in a tablet report.
+
+Empty per-tablet replica maps in an otherwise healthy leader master can
+cause a variety of failures:
+
+1. **AlterTable()** requests may time out.
+2. **GetTableLocations()** and **GetTabletLocations()** requests may yield
+   no replica location information.
+3. **DeleteTable()** requests will succeed but won't delete any tablets
+   from tservers until those tablets find their way into tablet reports.
+
+#### KUDU-1374: Operations triggered by heartbeats may go unperformed
+
+As described earlier, the inclusion or exclusion of a tablet in an
+incremental tablet report is edge-triggered, and may result in a state
+changing operation on the tserver, communicated via out-of-band RPC. This
+RPC is retried until it is successful. However, if the leader master dies
+*after* it is able to respond to the tserver's heartbeat but *before* the
+out-of-band RPC is sent, the edge-triggered tablet report may be missed, and
+the state changing operation will not be performed until the next time the
+tablet is included in a tablet report. As tablet report inclusion criteria
+is narrow, operations may be "missed" for quite some time.
+
+These operations include:
+1. Some tablet deletions, such as tablets belonging to orphaned tables, or
+   tablets whose deletion RPCs were sent and failed during an earlier
+   **DeleteTable()** request.
+2. Some tablet alters, such as tablets whose alter RPCs were sent and failed
+   during an earlier **AlterTable()** request.
+3. Config changes sent due to under-replicated tablets.
+
+#### KUDU-495: Masters may abort if replication fails
+
+Some master operations will crash the master when replication fails. That's
+because they were implemented with local consensus in mind, wherein a
+replication failure is indicative of a disk failure and recovery is
+unlikely. With multiple masters and Raft consensus, replication may fail if
+the current leader master is no longer the leader master (e.g. it was
+partitioned from the rest of its Raft configuration, which promptly elected
+a new leader), raising the likelihood of a master crash.
+
+### Unimplemented features
+
+#### Master Raft configuration changes
+
+It's not currently possible to add or remove a master from an active Raft
+configuration.
+
+It would be nice to implement this for Kudu 1.0, but it's not a strict
+requirement.
+
+#### KUDU-500: allow followers to handle read-only client operations
+
+Currently followers reject any client operations as the expectation is that
+clients communicate with the leader master. As a performance optimization,
+followers could handle certain "safe" operations (i.e. read-only requests),
+but follower masters must do a better job of keeping their in-memory caches
+up-to-date before this change is made. Moreover, operations should include
+an indication of how stale the follower master's information is allowed to
+be for it to be considered acceptable by the client.
+
+It would be nice to implement this for Kudu 1.0, but it's not a strict
+requirement.
+
+## Gaps in the clients
+
+(TBD)
+
+TODO: JD says the code that detects the current master was partially removed
+from the Java client because it was buggy. Needs further investigation.
+
+## Goals
+
+1. The plan must address all of the aforementioned known issues.
+2. If possible, the plan should implement the missing features too.
+3. The plan's scope should be minimized, provided that doesn't complicate
+   future implementations. In other words, the plan should not force
+   backwards incompatibilities down the line.
+
+## Plan
+
+### Heartbeat to all masters
+
+This is probably the most effective way to address KUDU-1358, and, as a side
+benefit, helps implement the "verify cluster connectivity" feature. [This
+old design document](old-multi-master-heartbeating.md) describes KUDU-1358
+and its solutions in more detail.
+
+With this change, tservers no longer need to "follow the leader" as they will
+heartbeat to every master. However, a couple things need to change for this
+to work correctly:
+
+#### Follower masters must process heartbeats, at least in part
+
+Basically, they're only intended to refresh the master's notion of the
+tserver's liveness; table and tablet information is still replicated from
+the leader master and should be ignored if found in the heartbeat.
+
+#### All state changes taken by a tserver must be fenced
+
+That is, the master and/or tserver must enforce that all actions take effect
+iff they were sent by the master that is currently the leader.
+
+After an exhaustive audit of all master state changes (see appendix A), it
+was determined that the current protection mechanisms built into each RPC
+are sufficient to provide fencing. The one exception is orphaned replica
+deletion done in response to a heartbeat. To protect against that, true
+orphans (i.e. tablets for which no persistent record exists) will not be
+deleted at all. As the master retains deleted table/tablet metadata in
+perpetuity, this should ensure that true orphans appear only under drastic
+circumstances, such as a tserver that heartbeats to the wrong cluster.
+
+The following protection mechanisms are here for historical record; they
+will not be implemented.
+
+##### Alternative fencing mechanisms
+
+One way to do this is by including the current term in every requested state
+change and hearbeat response. Each tserver maintains the current term in
+memory, reset whenever a heartbeat response or RPC includes a later term
+(thus also serving as a "there is a new leader master" notification). If a
+state change request includes an older term, it is rejected. When a tserver
+first starts up, it initializes the current term with whatever term is
+indicated in the majority of the heartbeat responses. In this way it
+can protect itself from a "rogue master" at startup without having to
+persist the current term to disk.
+
+An alternative to the above fencing protocol is to ensure that the leader
+master replicates via Raft before triggering a state change. It doesn't
+matter what is replicated; a successful replication asserts that this master
+is still the leader. However, our Raft implementation doesn't currently
+allow for replicating no-ops (i.e. log entries that needn't be persisted).
+Moreover, this is effectively an implementation of "leader leases" (in that
+a successful replication grants the leader a "lease" to remain leader for at
+least one Raft replication interval), but one that the rest of Kudu must be
+made aware of in order to be fully robust.
+
+### Send full heartbeats to newly elected leader masters
+
+To address KUDU-1374, when a tserver passively detects that there's a new
+leader master (i.e. step #2 above), it should send it a full heartbeat. This
+will ensure that any heartbeat-triggered actions intended but not taken by
+the old leader master are reinitiated by the new one.
+
+### Ensure each logical operation is replicated as one batch
+
+Kudu doesn't yet support atomic multi-row transactions, but all row
+operations bound for one tablet and batched into one Write RPC are combined
+into one logical transaction. This property is useful for multi-master
+support as all master metadata is encapsulated into a single tablet. With
+some refactoring, it is possible to ensure that any logical operation
+(e.g. creating a table) is encapsulated into a single RPC. Doing so would
+obviate the need for "roll forward" repair of partially replicated
+operations during metadata load and is necessary to address KUDU-495.
+
+Some repair is still necessary for table-wide operations. These complete on
+a tablet by tablet basis and thus it is possible for partially created,
+altered, or deleted tables to exist at any point in time. However, the
+repair is a natural course of action taken by the leader master:
+
+1. During a tablet report: for example, if the master sees a tablet without
+   the latest schema, it'll send that tablet an idempotent alter RPC.
+   Further, thanks to the above full heartbeat change, the new leader master
+   will have an opportunity to roll forward such tables on master failover.
+2. In the background: the master will periodically scan its in-memory state
+   looking for tablets that have yet to be reported. If it finds one, it
+   will be given a "nudge"; the master will send a create tablet RPC to the
+   appropriate tserver. This scanning continues in a new leader master
+   following master failover.
+
+All batched logical operations include one table entry and *N* tablet
+entries, where *N* is the number of tablets in the table. These entries are
+encapsulated in a WriteRequestPB that is replicated by the leader master to
+follower masters. When *N* is very large, it is conceivable for the
+WriteRequestPB to exceed the maximum size of a Kudu RPC. To determine just
+how likely this is, replication RPC sizes were measured in the creation of a
+table with 1000 tablets and a simple three-column schema. The results: the
+replication RPC clocked in at ~117 KB, a far cry from the 8 MB maximum RPC
+size. Thus, a batch-based approach should not be unreasonable for today's
+scale targets.
+
+### Remove some unnecessary in-memory caches
+
+To fix KUDU-1353, the per-tablet replica locations could be removed entirely.
+The same information is already present in each tablet instance, just not in
+an easy to use map form. The only downside is that operations that previously
+used the cached locations would need to perform more lookups into the tserver
+map, to resolve tserver UUIDs into instances. We think this is a reasonable
+trade-off, however, as the tserver map should be hot.
+
+An alternative is to rebuild the per-tablet replica locations on metadata
+load, but the outright removal of that cached data is a simpler solution.
+
+The tserver cache could also be rebuilt, but:
+
+1. It will be incomplete, as only the last known RPC address (and none of
+   the HTTP addresses) is persisted. Additionally, addressing KUDU-418 may
+   require the removal of the last known RPC address from the persisted
+   state, at which point there's nothing worth rebuilding.
+2. The tserver cache is expected to be warm from the moment any master
+   (including followers) starts up due to "Heartbeat to all masters" above.
+
+Note: in-memory caches belonging to a former leader master will, by
+definition, contain stale information. These caches could be
+cleared following an election, but it shouldn't matter either way as this
+master is no longer servicing client requests or tserver heartbeats.
+
+### Ensure strict ordering for all state changes
+
+Master state change operations should adhere to the following contract:
+
+1. Acquire locks on the relevant table and/or tablets. Which locks and
+   whether the locks are held for reading or writing depends on the
+   operation. For example, **DeleteTable()** acquires locks for writing on
+   the table and all of its tablets. Table locks must be acquired before
+   tablet locks.
+2. Mutate in-memory state belonging to the table and/or tablets. These
+   mutations are made via COW so that concurrent readers continue to see
+   only "committed" data.
+3. Replicate all of the mutations via Raft consensus in a single batch. If
+   the replication fails, the overall operation fails, the mutations are
+   discarded, and the locks are released.
+4. Replication has succeeded; the operation may not fail beyond this point.
+5. Commit the mutations. If both table and tablet mutations are committed,
+   tablet mutations must come first. Any other in-memory changes must be
+   performed now.
+6. The success of the operation is now consistent on this master in both
+   on-disk state as well as in-memory state.
+7. Send RPCs to the tservers (e.g. **DeleteTable()** will now send delete
+   tablet RPCs to each tablet in the table). The work done by an RPC must
+   take place, either by retrying the RPC until it succeeds, or through some
+   other mechanism, such as sending a new RPC in response to a heartbeat.
+
+Generally speaking, this contract is upheld universally. However, a detailed
+audit (see appendix B) of the master has revealed a few exceptions:
+
+1. During **CreateTable()**, intermediate table and tablet state is made
+   visible prior to replication.
+2. During **DeleteTable()**, RPCs are sent prior to the committing of
+   mutations.
+3. When background scanning for newly created tablets, the logic that
+   "replaces" timed out tablets makes the replacements visible prior to
+   replication.
+
+To prevent clients from seeing intermediate state and other potential
+issues, these operations must be made to adhere to the above contract.
+
+## Appendix A: Fencing audit and discussion
+
+To understand which master operations need to be fenced, we ask the following
+key questions:
+
+1. What decisions can a master make unilaterally (i.e. without first
+   replicating so as to establish consensus)?
+2. When making such decisions, does the master at least consult past replicated
+   state first? If it does, would "stale" state yield incorrect decisions?
+3. If it doesn't (or can't) consult replicated state, are the external actions
+   performed as a result of the decision safe?
+
+We identified the set of potentially problematic external actions as those taken
+by the master during tablet reports.
+
+We ruled out ChangeConfig; it is safe due to the use of CAS on the last change
+config opid (protects against two leader masters both trying to add a server),
+and because if the master somehow added a redundant server, in the worst case
+the new replica will be deleted the next time it heartbeats.
+
+That left DeleteReplica, which is called under the following circumstances:
+
+1. When the master can't find any record of the replica's tablet or its table,
+   it is deleted.
+2. When the persistent state says that the replica (or its table) has been
+   deleted, it is deleted.
+3. When the persistent state says that the replica is no longer part of the Raft
+   config, it is deleted.
+4. When the persistent state includes replicas that aren't in the latest Raft
+   config, they are deleted.
+
+Like ChangeConfig, cases 3 and 4 are protected with a CAS. Cases 1 and 2 are
+not, but 2 falls into category #2 from earlier: if persistent state is consulted
+and the decision is made to delete a replica, that decision is correct and
+cannot become incorrect (i.e. under no circumstance would a tablet become
+"undeleted").
+
+That leaves case 1 as the only instance that needs additional fencing. We could
+implement leader leases as described earlier or "current term" checking to
+protect against it.
+
+Or, we could 1) continue our current policy of retaining persistent state of
+deleted tables/tablets forever, and 2) change the master not to delete
+tablets for which it has no records. If we always have the persistent state
+for deleted tables, all instances of case 1 become case 2 unless there's
+some drastic problem (e.g. tservers are heartbeating to the wrong master),
+in which case not deleting the tablets is probably the right thing to do.
+
+## Appendix B: Master operation audit and analysis
+
+The following are detailed audits of how each master operation works today.
+The description of each operation is followed by a list of potential
+improvements, all of which were already incorporated into the above
+plan. These audits may be useful to understanding the plan.
+
+### Creating a new table
+
+#### CreateTable() RPC
+
+1. create table in state UNKNOWN, begin mutation to state PREPARING
+2. create tablets in state UNKNOWN, begin mutation to state PREPARING
+3. update in-memory maps (table by id/name, tablet by id)
+   - new table and tablets are now visible with UNKNOWN state and no
+     useful metadata (table schema, name, tablet partitions, etc.)
+4. replicate new tablets
+5. change table from state PREPARING to RUNNING
+6. replicate new table
+7. commit mutation for table
+   - new table and tables are visible, table with full metadata in RUNNING
+     state but tablets still in UNKNOWN state without metadata
+8. commit mutations for tablets
+   - new table and tablets are visible with full metadata, table in
+     RUNNING state, tablets in PREPARING state (without consensus state)
+9. wake up bg_tasks thread
+
+Potential improvements:
+
+1. The two replications can be safely combined into one replication
+2. Consumers of table and tablet in-memory state must be prepared for
+   UNKNOWN intermediate state without metadata, as well as lack of consensus
+   state. This can be addressed by:
+   1. Adding a new global in-memory unordered_set to "lock" a table's name
+      while creating it. When the create is finished (regardless of
+      success), the name is unlocked. We could even reuse the tablet
+      LockManager for this, as it has no real tablet dependencies
+   2. Moving step #3 to after step #7
+
+#### Background task scanning
+
+1. for each tablet t in state PREPARING:
+   - change t from state PREPARING to CREATING
+2. for each tablet t_old in state CREATING:
+   - if t_old timed out:
+     1. create tablet t_new in state UNKNOWN, begin mutation to state PREPARING
+     2. add t_new to table's tablet_map
+        - t_new is now visible when operating on all of table's tablets, but
+          is in UNKNOWN state and has no useful metadata
+     3. update in-memory tablet_by_id map with t_new
+        - t_new is now visible to by tablet_id lookups, still UNKNOWN and
+          no useful metadata
+     4. change t_old from state CREATING to REPLACED
+     5. change t_new from state PREPARING to CREATING
+3. for each tablet t in state CREATING:
+   - reset t committed_consensus_state:
+     1. reset term to min term
+     2. reset consensus type to local or distributed
+     3. reset index to invalid op index
+     4. select peers
+4. replicate new and existing tablets
+5. if error in replica selection or replication:
+   1. update each table's in-memory tablet_map to remove t_new tablets (step 2)
+      - t_new tablets still visible to by tablet_id lookups
+   2. update tablet_by_id map to remove t_new tablets (step 2)
+      - t_new tablets no longer visible
+6. send DeleteTablet() RPCs for all t_old tablets (step 2)
+7. send CreateTablet() RPCs for all created tablets
+8. commit mutations for new and existing tablets
+   - replacement tablets from step #2 now have full metadata, all tablets now
+     have visible consensus state
+
+Potential improvements:
+
+1. All replication is already atomic; nothing to change here
+2. Steps 6 and 7 should probably be reordered after Step 8
+3. t_new can expose intermediate state to consumers, much like in
+   CreateTable(). Is this safe?
+   1. Remove step #5
+   2. After step #8 (but before RPCs), insert steps #2b and #2c
+
+### Deleting a table
+
+#### DeleteTable() RPC
+
+1. change table from state RUNNING (or ALTERING) to REMOVED
+2. replicate table
+3. commit mutation for table
+   - new state (REMOVED) is now visible
+4. for each tablet t:
+   1. Send DeleteTablet() RPC for t
+   2. change t from state RUNNING to DELETED
+   3. replicate t
+   4. commit mutation for t
+      - new state (DELETED) is now visible
+
+Potential improvements:
+
+1. Table and tablet replications can be safely combined, provided we invert
+   the commit order and make sure rest of master is OK with that
+2. DeleteTablet() RPC should come after tablet mutation is committed
+
+### Altering a table
+
+#### AlterTable() RPC
+
+1. if requested change to table name:
+   - change name
+2. if requested change to table schema:
+   1. reset fully_applied_schema with current schema
+   2. reset current schema
+3. increment schema version
+4. increment next column id
+5. change table from state RUNNING to ALTERING
+6. replicate table
+7. commit mutation to table
+   - new state (ALTERING), schema, etc. are now visible
+8. Send AlterTablet() RPCs for each tablet
+
+No potential improvements
+
+### Heartbeating
+
+#### Heartbeat for tablet t
+
+1. if t fails by_id lookup:
+   - send DeleteTablet() RPC for t
+2. if t's table does not exist:
+   - send DeleteTablet() RPC for t
+3. if t is REMOVED or REPLACED, or t's table is DELETED:
+   - send DeleteTablet() RPC for t
+4. if t is no longer in the list of peers:
+   - send DeleteTablet() RPC for t
+5. if t CREATING and has leader:
+   - change t from CREATING to RUNNING
+6. change consensus state
+   1. reset committed_consensus_state (if it exists in report)
+   2. update tablet replica locations
+      - new replica locations now immediately visible
+   3. if t is no longer in the list of peers:
+      - send DeleteTablet() RPC for t
+   4. if tablet is under-replicated
+      - send ConfigChange() RPC for t
+8. update t
+9. commit mutation for t
+   - new state and committed consensus state are now visible
+10. if t reported schema version isn't latest:
+   - send AlterTablet() RPC for t
+11. else:
+   1. update tablet schema version
+      - new schema version is immediately visible
+   2. if table is in state ALTERING and all tablets have latest schema version:
+      1. clear fully_applied_schema
+      2. change table from state ALTERING to RUNNING
+      3. replicate table
+      4. commit mutation for table
+         - new state (RUNNING) and empty fully_applied_schema now visible
+
+No potential improvements. One replication per tablet (as written) is OK
+because:
+
+1. If replication fails but master still alive: tserver will retry same
+   kind of heartbeat, giving master a chance to replicate again
+2. If replication fails and master dies: with proposed "send full heartbeat
+   on new leader master" change, failed replications will be retried by new
+   leader master


[2/3] incubator-kudu git commit: master: don't expose any CreateTable() state too early

Posted by ad...@apache.org.
master: don't expose any CreateTable() state too early

This was originally addressed by auditing all catalog manager readers (see
commit f971971), but David pointed out that we could do a better job if we
tracked ongoing table creation independently. With that in mind, here's an
approach that fixes the problem in CreateTable(). It's not absolutely
necessary, but:
1. It is forgiving to readers who forget to check the state of the table.
2. It obviates the need for any abort logic should table creation fail.

The new test highlights the tradeoff inherent by not introducing a new lock
that is used by "losers" to wait on table creation: the operation can fail
in a not quite TABLE_ALREADY_PRESENT but not quite TABLE_NOT_FOUND way. I
chose to express this using a new combination of error and code rather than
introduce a new code altogether.

Change-Id: Ib9e11037e7f8b4c34db5e0f2b5be00f806532365
Reviewed-on: http://gerrit.cloudera.org:8080/2714
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/6e7a04aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/6e7a04aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/6e7a04aa

Branch: refs/heads/master
Commit: 6e7a04aa54d34864b5bbdd486a96024342f6f302
Parents: af8a4af
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Apr 1 21:13:43 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Apr 27 01:08:02 2016 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc | 94 +++++++++++++--------------------
 src/kudu/master/catalog_manager.h  | 13 ++---
 src/kudu/master/master-test.cc     | 58 ++++++++++++++++++++
 3 files changed, 101 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6e7a04aa/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 71f51fb..006bf63 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -84,6 +84,7 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/random_util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/threadpool.h"
@@ -733,42 +734,6 @@ Status CatalogManager::CheckOnline() const {
   return Status::OK();
 }
 
-void CatalogManager::AbortTableCreation(TableInfo* table,
-                                        const vector<TabletInfo*>& tablets) {
-  string table_id = table->id();
-  string table_name = table->mutable_metadata()->mutable_dirty()->pb.name();
-  vector<string> tablet_ids_to_erase;
-  for (TabletInfo* tablet : tablets) {
-    tablet_ids_to_erase.push_back(tablet->tablet_id());
-  }
-
-  LOG(INFO) << "Aborting creation of table '" << table_name << "', erasing table and tablets (" <<
-      JoinStrings(tablet_ids_to_erase, ",") << ") from in-memory state.";
-
-  // Since this is a failed creation attempt, it's safe to just abort
-  // all tasks, as (by definition) no tasks may be pending against a
-  // table that has failed to succesfully create.
-  table->AbortTasks();
-  table->WaitTasksCompletion();
-
-  boost::lock_guard<LockType> l(lock_);
-
-  // Call AbortMutation() manually, as otherwise the lock won't be
-  // released.
-  for (TabletInfo* tablet : tablets) {
-    tablet->mutable_metadata()->AbortMutation();
-  }
-  table->mutable_metadata()->AbortMutation();
-  for (const string& tablet_id_to_erase : tablet_ids_to_erase) {
-    CHECK_EQ(tablet_map_.erase(tablet_id_to_erase), 1)
-        << "Unable to erase tablet " << tablet_id_to_erase << " from tablet map.";
-  }
-  CHECK_EQ(table_names_map_.erase(table_name), 1)
-      << "Unable to erase table named " << table_name << " from table names map.";
-  CHECK_EQ(table_ids_map_.erase(table_id), 1)
-      << "Unable to erase tablet with id " << table_id << " from tablet ids map.";
-}
-
 // Create a new table.
 // See README file in this directory for a description of the design.
 Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
@@ -867,7 +832,6 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   }
 
   scoped_refptr<TableInfo> table;
-  vector<TabletInfo*> tablets;
   {
     boost::lock_guard<LockType> l(lock_);
     TRACE("Acquired catalog manager lock");
@@ -880,28 +844,34 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
       return s;
     }
 
-    // c. Add the new table in "preparing" state.
-    table = CreateTableInfo(req, schema, partition_schema);
-    table_ids_map_[table->id()] = table;
-    table_names_map_[req.name()] = table;
-
-    // d. Create the TabletInfo objects in state PREPARING.
-    for (const Partition& partition : partitions) {
-      PartitionPB partition_pb;
-      partition.ToPB(&partition_pb);
-      scoped_refptr<TabletInfo> t = CreateTabletInfo(table.get(), partition_pb);
-      tablets.push_back(t.get());
-
-      // Add the new tablet to the catalog-manager-wide map by tablet ID.
-      InsertOrDie(&tablet_map_, t->tablet_id(), std::move(t));
+    // c. Mark the table as being created (if it isn't already).
+    if (!InsertIfNotPresent(&tables_being_created_, req.name())) {
+      s = Status::ServiceUnavailable("Table is currently being created", req.name());
+      SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
+      return s;
     }
+  }
 
-    resp->set_table_id(table->id());
+  // Ensure that if we return, we mark this table as no longer being created.
+  auto cleanup = MakeScopedCleanup([&] () {
+    boost::lock_guard<LockType> l(lock_);
+    CHECK_EQ(1, tables_being_created_.erase(req.name()));
+  });
 
-    // Add the tablets to the table's map.
-    table->AddTablets(tablets);
+  // d. Create the in-memory representation of the new table and its tablets.
+  //    It's not yet in any global maps; that will happen in step g below.
+  table = CreateTableInfo(req, schema, partition_schema);
+  vector<TabletInfo*> tablets;
+  vector<scoped_refptr<TabletInfo>> tablet_refs;
+  for (const Partition& partition : partitions) {
+    PartitionPB partition_pb;
+    partition.ToPB(&partition_pb);
+    scoped_refptr<TabletInfo> t = CreateTabletInfo(table.get(), partition_pb);
+    tablets.push_back(t.get());
+    tablet_refs.emplace_back(std::move(t));
   }
-  TRACE("Inserted new table and tablet info into CatalogManager maps");
+  table->AddTablets(tablets);
+  TRACE("Created new table and tablet info");
 
   // NOTE: the table and tablets are already locked for write at this point,
   // since the CreateTableInfo/CreateTabletInfo functions leave them in that state.
@@ -922,7 +892,6 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
     s = s.CloneAndPrepend(Substitute("An error occurred while writing to sys-catalog: $0",
                                      s.ToString()));
     LOG(WARNING) << s.ToString();
-    AbortTableCreation(table.get(), tablets);
     CheckIfNoLongerLeaderAndSetupError(s, resp);
     return s;
   }
@@ -935,6 +904,19 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
     tablet->mutable_metadata()->CommitMutation();
   }
 
+  // g. Make the new table and tablets visible in the catalog.
+  {
+    boost::lock_guard<LockType> l(lock_);
+
+    table_ids_map_[table->id()] = table;
+    table_names_map_[req.name()] = table;
+    for (const auto& tablet : tablet_refs) {
+      InsertOrDie(&tablet_map_, tablet->tablet_id(), std::move(tablet));
+    }
+  }
+  TRACE("Inserted table and tablets into CatalogManager maps");
+
+  resp->set_table_id(table->id());
   VLOG(1) << "Created table " << table->ToString();
   background_tasks_->Wake();
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6e7a04aa/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index a7eb896..e003994 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -601,13 +601,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
 
   std::string GenerateId() { return oid_generator_.Next(); }
 
-  // Abort creation of 'table': abort all mutation for TabletInfo and
-  // TableInfo objects (releasing all COW locks), abort all pending
-  // tasks associated with the table, and erase any state related to
-  // the table we failed to create from the in-memory maps
-  // ('table_names_map_', 'table_ids_map_', 'tablet_map_' below).
-  void AbortTableCreation(TableInfo* table, const std::vector<TabletInfo*>& tablets);
-
   // Conventional "T xxx P yyy: " prefix for logging.
   std::string LogPrefix() const;
 
@@ -615,7 +608,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // objects have a copy of the string key. But STL doesn't make it
   // easy to make a "gettable set".
 
-  // Lock protecting the various maps below.
+  // Lock protecting the various maps and sets below.
   typedef rw_spinlock LockType;
   mutable LockType lock_;
 
@@ -628,6 +621,10 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   typedef std::unordered_map<std::string, scoped_refptr<TabletInfo> > TabletInfoMap;
   TabletInfoMap tablet_map_;
 
+  // Names of tables that are currently being created. Only used in
+  // table creation so that transient tables are not made visible.
+  std::unordered_set<std::string> tables_being_created_;
+
   Master *master_;
   Atomic32 closing_;
   ObjectIdGenerator oid_generator_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6e7a04aa/src/kudu/master/master-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index af5c986..2315afe 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -846,5 +846,63 @@ TEST_F(MasterTest, TestMasterMetadataConsistentDespiteFailures) {
   ASSERT_OK(verifier.Verify());
 }
 
+static void CreateTableOrFail(const char* kTableName,
+                              const Schema* kSchema,
+                              MasterServiceProxy* proxy) {
+  CreateTableRequestPB req;
+  CreateTableResponsePB resp;
+  RpcController controller;
+
+  req.set_name(kTableName);
+  CHECK_OK(SchemaToPB(*kSchema, req.mutable_schema()));
+  CHECK_OK(proxy->CreateTable(req, &resp, &controller));
+  SCOPED_TRACE(resp.DebugString());
+
+  // There are three expected outcomes:
+  //
+  // 1. This thread won the CreateTable() race: no error.
+  // 2. This thread lost the CreateTable() race: TABLE_NOT_FOUND error
+  //    with ServiceUnavailable status.
+  // 3. This thread arrived after the CreateTable() race was already over:
+  //    TABLE_ALREADY_PRESENT error with AlreadyPresent status.
+  if (resp.has_error()) {
+    Status s = StatusFromPB(resp.error().status());
+    string failure_msg = Substitute("Unexpected response: $0",
+                                    resp.DebugString());
+    switch (resp.error().code()) {
+      case MasterErrorPB::TABLE_NOT_FOUND:
+        CHECK(s.IsServiceUnavailable()) << failure_msg;
+        break;
+      case MasterErrorPB::TABLE_ALREADY_PRESENT:
+        CHECK(s.IsAlreadyPresent()) << failure_msg;
+        break;
+      default:
+        FAIL() << failure_msg;
+    }
+  }
+}
+
+TEST_F(MasterTest, TestConcurrentCreateOfSameTable) {
+  const char* kTableName = "testtb";
+  const Schema kTableSchema({ ColumnSchema("key", INT32),
+                              ColumnSchema("v1", UINT64),
+                              ColumnSchema("v2", STRING) },
+                            1);
+
+  // Kick off a bunch of threads all trying to create the same table.
+  vector<scoped_refptr<Thread>> threads;
+  for (int i = 0; i < 10; i++) {
+    scoped_refptr<Thread> t;
+    EXPECT_OK(Thread::Create("test", "test",
+                             &CreateTableOrFail, kTableName, &kTableSchema,
+                             proxy_.get(), &t));
+    threads.emplace_back(std::move(t));
+  }
+
+  for (const auto& t : threads) {
+    t->Join();
+  }
+}
+
 } // namespace master
 } // namespace kudu