You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/07/14 21:12:22 UTC

[1/2] incubator-kudu git commit: master: add read-write lock to serialize operations around elections

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 2525ad094 -> 05369115d


master: add read-write lock to serialize operations around elections

This rigmarole began with an investigation into a test failure [1], which
led to a new integration test that hammers VisitTablesAndTablets() while
creating tables. That test revealed other locking issues, which brings us
to where we are now.

This patch introduces a read-write lock to serialize all master operations
so that they fall on one side or the other of a leader election. The idea
is to avoid performing operations concurrently with a reload of the master
metadata; doing so can lead to problems in Shutdown() and (very rarely,
perhaps only conceptually) to inconsistent on-disk state.

I was hoping this lock could replace the fencing done by leader_ready_term_,
but eventually reasoned that we need both; without leader_ready_term_
fencing, the master's consensus state machine could fool an operation into
thinking the master became the leader before the metadata was reloaded.

Three other things of note here:
- The new lock is acquired via TryLock() so that, if the lock could not be
  acquired, the RPC will fail rather than block. A future patch modifies
  TSHeartbeat() to partially accept heartbeats even if the master is a
  follower; TryLock() means that a transitioning leader that is pelted with
  RPCs won't fill up its service queue and can still process heartbeats.
- TableInfo's AddTask() and RemoveTask() methods now don't hold the table's
  lock when adding and removing refs from the task respectively. This is
  the fix for the original test failure.
- When reloading metadata, we now abort all outstanding table tasks to
  avoid orphaning them.

1. http://dist-test.cloudera.org:8080/diagnose?key=224b3aa2-3c87-11e6-9a09-0242ac110001

Change-Id: I5084c09f1a77ccf620fb6cd621094c4778d636f8
Reviewed-on: http://gerrit.cloudera.org:8080/3550
Reviewed-by: Dan Burkert <da...@cloudera.com>
Tested-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/d1d3cfe7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/d1d3cfe7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/d1d3cfe7

Branch: refs/heads/master
Commit: d1d3cfe77d4fcd4e208cb6214deb654b9e8765fb
Parents: 2525ad0
Author: Adar Dembo <ad...@cloudera.com>
Authored: Thu Jun 30 17:30:21 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu Jul 14 21:09:50 2016 +0000

----------------------------------------------------------------------
 .../create-table-stress-test.cc                 |  59 ++++-
 src/kudu/integration-tests/mini_cluster.cc      |   6 +-
 src/kudu/master/catalog_manager.cc              | 219 +++++++++++++------
 src/kudu/master/catalog_manager.h               |  97 +++++++-
 src/kudu/master/master.cc                       |   8 +-
 src/kudu/master/master_service.cc               |  93 +++-----
 6 files changed, 336 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d1d3cfe7/src/kudu/integration-tests/create-table-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/create-table-stress-test.cc b/src/kudu/integration-tests/create-table-stress-test.cc
index 31313f4..c740e58 100644
--- a/src/kudu/integration-tests/create-table-stress-test.cc
+++ b/src/kudu/integration-tests/create-table-stress-test.cc
@@ -18,11 +18,13 @@
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
 #include <memory>
+#include <thread>
 
 #include "kudu/client/client.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/mini_cluster.h"
 #include "kudu/master/master.proxy.h"
@@ -49,9 +51,12 @@ using kudu::rpc::RpcController;
 
 DECLARE_int32(heartbeat_interval_ms);
 DECLARE_bool(log_preallocate_segments);
-DECLARE_bool(enable_remote_bootstrap);
 DEFINE_int32(num_test_tablets, 60, "Number of tablets for stress test");
 
+using std::thread;
+using std::unique_ptr;
+using strings::Substitute;
+
 namespace kudu {
 
 const char* kTableName = "test_table";
@@ -78,10 +83,6 @@ class CreateTableStressTest : public KuduTest {
     // this won't be necessary.
     FLAGS_log_preallocate_segments = false;
 
-    // Workaround KUDU-941: without this, it's likely that while shutting
-    // down tablets, they'll get resuscitated by their existing leaders.
-    FLAGS_enable_remote_bootstrap = false;
-
     KuduTest::SetUp();
     MiniClusterOptions opts;
     opts.num_tablet_servers = 3;
@@ -110,10 +111,10 @@ class CreateTableStressTest : public KuduTest {
 
  protected:
   client::sp::shared_ptr<KuduClient> client_;
-  gscoped_ptr<MiniCluster> cluster_;
+  unique_ptr<MiniCluster> cluster_;
   KuduSchema schema_;
   std::shared_ptr<Messenger> messenger_;
-  gscoped_ptr<MasterServiceProxy> master_proxy_;
+  unique_ptr<MasterServiceProxy> master_proxy_;
   TabletServerMap ts_map_;
 };
 
@@ -127,7 +128,7 @@ void CreateTableStressTest::CreateBigTable(const string& table_name, int num_tab
     split_rows.push_back(row);
   }
 
-  gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
   ASSERT_OK(table_creator->table_name(table_name)
             .schema(&schema_)
             .set_range_partition_columns({ "key" })
@@ -296,7 +297,7 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
 
   // Get a single tablet in the middle, make sure we get that one back
 
-  gscoped_ptr<KuduPartialRow> row(schema_.NewRow());
+  unique_ptr<KuduPartialRow> row(schema_.NewRow());
   ASSERT_OK(row->SetInt32(0, half_tablets - 1));
   string start_key_middle;
   ASSERT_OK(row->EncodeRowKey(&start_key_middle));
@@ -315,4 +316,44 @@ TEST_F(CreateTableStressTest, TestGetTableLocationsOptions) {
   }
 }
 
+// Creates tables and reloads on-disk metadata concurrently to test for races
+// between the two operations.
+TEST_F(CreateTableStressTest, TestConcurrentCreateTableAndReloadMetadata) {
+  AtomicBool stop(false);
+
+  thread reload_metadata_thread([&]() {
+    while (!stop.Load()) {
+      CHECK_OK(cluster_->mini_master()->master()->catalog_manager()->VisitTablesAndTablets());
+
+      // Give table creation a chance to run.
+      SleepFor(MonoDelta::FromMilliseconds(1));
+    }
+  });
+  for (int num_tables_created = 0; num_tables_created < 20;) {
+    string table_name = Substitute("test-$0", num_tables_created);
+    LOG(INFO) << "Creating table " << table_name;
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    Status s = table_creator->table_name(table_name)
+                  .schema(&schema_)
+                  .set_range_partition_columns({ "key" })
+                  .num_replicas(3)
+                  .wait(false)
+                  .Create();
+    if (s.IsServiceUnavailable()) {
+      // The master was busy reloading its metadata. Try again.
+      //
+      // This is a purely synthetic case. In real life, it only manifests at
+      // startup (single master) or during leader failover (multiple masters).
+      // In the latter case, the client will transparently retry to another
+      // master. That won't happen here as we've only got one master, so we
+      // must handle retrying ourselves.
+      continue;
+    }
+    ASSERT_OK(s);
+    num_tables_created++;
+  }
+  stop.Store(true);
+  reload_metadata_thread.join();
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d1d3cfe7/src/kudu/integration-tests/mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/mini_cluster.cc b/src/kudu/integration-tests/mini_cluster.cc
index a32287e..7133b8e 100644
--- a/src/kudu/integration-tests/mini_cluster.cc
+++ b/src/kudu/integration-tests/mini_cluster.cc
@@ -40,6 +40,7 @@ namespace kudu {
 
 using client::KuduClient;
 using client::KuduClientBuilder;
+using master::CatalogManager;
 using master::MiniMaster;
 using master::TabletLocationsPB;
 using master::TSDescriptor;
@@ -187,8 +188,9 @@ MiniMaster* MiniCluster::leader_mini_master() {
       if (master->master()->IsShutdown()) {
         continue;
       }
-      if (master->master()->catalog_manager()->IsInitialized() &&
-          master->master()->catalog_manager()->CheckIsLeaderAndReady().ok()) {
+      CatalogManager::ScopedLeaderSharedLock l(
+          master->master()->catalog_manager());
+      if (l.catalog_status().ok() && l.leader_status().ok()) {
         return master;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d1d3cfe7/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 07d8487..2d09d82 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -339,31 +339,32 @@ void CatalogManagerBgTasks::Shutdown() {
 
 void CatalogManagerBgTasks::Run() {
   while (!NoBarrier_Load(&closing_)) {
-    // Perform assignment processing.
-    if (!catalog_manager_->IsInitialized()) {
-      LOG(WARNING) << "Catalog manager is not initialized!";
-    } else if (catalog_manager_->CheckIsLeaderAndReady().ok()) {
-      std::vector<scoped_refptr<TabletInfo>> to_process;
-
-      // Get list of tablets not yet running.
-      catalog_manager_->ExtractTabletsToProcess(&to_process);
-
-      if (!to_process.empty()) {
-        // Transition tablet assignment state from preparing to creating, send
-        // and schedule creation / deletion RPC messages, etc.
-        Status s = catalog_manager_->ProcessPendingAssignments(to_process);
-        if (!s.ok()) {
-          // If there is an error (e.g., we are not the leader) abort this task
-          // and wait until we're woken up again.
-          //
-          // TODO Add tests for this in the revision that makes
-          // create/alter fault tolerant.
-          LOG(ERROR) << "Error processing pending assignments, aborting the current task: "
-                     << s.ToString();
+    {
+      CatalogManager::ScopedLeaderSharedLock l(catalog_manager_);
+      if (!l.catalog_status().ok()) {
+        LOG(WARNING) << "Catalog manager background task thread going to sleep: "
+                     << l.catalog_status().ToString();
+      } else if (l.leader_status().ok()) {
+        std::vector<scoped_refptr<TabletInfo>> to_process;
+
+        // Get list of tablets not yet running.
+        catalog_manager_->ExtractTabletsToProcess(&to_process);
+
+        if (!to_process.empty()) {
+          // Transition tablet assignment state from preparing to creating, send
+          // and schedule creation / deletion RPC messages, etc.
+          Status s = catalog_manager_->ProcessPendingAssignments(to_process);
+          if (!s.ok()) {
+            // If there is an error (e.g., we are not the leader) abort this task
+            // and wait until we're woken up again.
+            //
+            // TODO Add tests for this in the revision that makes
+            // create/alter fault tolerant.
+            LOG(ERROR) << "Error processing pending assignments, aborting the current task: "
+                       << s.ToString();
+          }
         }
       }
-    } else {
-      VLOG(1) << "We are no longer the leader, aborting the current task...";
     }
 
     // Wait for a notification or a timeout expiration.
@@ -530,7 +531,8 @@ CatalogManager::CatalogManager(Master *master)
   : master_(master),
     rng_(GetRandomSeed32()),
     state_(kConstructed),
-    leader_ready_term_(-1) {
+    leader_ready_term_(-1),
+    leader_lock_(RWMutex::Priority::PREFER_WRITING) {
   CHECK_OK(ThreadPoolBuilder("leader-initialization")
            // Presently, this thread pool must contain only a single thread
            // (to correctly serialize invocations of ElectedAsLeaderCb upon
@@ -640,8 +642,20 @@ void CatalogManager::VisitTablesAndTabletsTask() {
 }
 
 Status CatalogManager::VisitTablesAndTablets() {
+  // Block new catalog operations, and wait for existing operations to finish.
+  std::lock_guard<RWMutex> leader_lock_guard(leader_lock_);
+
+  // This lock is held for the entirety of the function because the calls to
+  // VisitTables and VisitTablets mutate global maps.
   std::lock_guard<LockType> lock(lock_);
 
+  // Abort any outstanding tasks. All TableInfos are orphaned below, so
+  // it's important to end their tasks now; otherwise Shutdown() will
+  // destroy master state used by these tasks.
+  vector<scoped_refptr<TableInfo>> tables;
+  AppendValuesFromMap(table_ids_map_, &tables);
+  AbortAndWaitForAllTasks(tables);
+
   // Clear the existing state.
   table_names_map_.clear();
   table_ids_map_.clear();
@@ -676,26 +690,6 @@ bool CatalogManager::IsInitialized() const {
   return state_ == kRunning;
 }
 
-Status CatalogManager::CheckIsLeaderAndReady() const {
-  std::lock_guard<simple_spinlock> l(state_lock_);
-  if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::ServiceUnavailable(
-        Substitute("Catalog manager is shutting down. State: $0", state_));
-  }
-  Consensus* consensus = sys_catalog_->tablet_peer_->consensus();
-  ConsensusStatePB cstate = consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED);
-  string uuid = master_->fs_manager()->uuid();
-  if (PREDICT_FALSE(!cstate.has_leader_uuid() || cstate.leader_uuid() != uuid)) {
-    return Status::IllegalState(
-        Substitute("Not the leader. Local UUID: $0, Consensus state: $1",
-                   uuid, cstate.ShortDebugString()));
-  }
-  if (PREDICT_FALSE(leader_ready_term_ != cstate.current_term())) {
-    return Status::ServiceUnavailable("Leader not yet ready to serve requests");
-  }
-  return Status::OK();
-}
-
 RaftPeerPB::Role CatalogManager::Role() const {
   CHECK(IsInitialized());
   return sys_catalog_->tablet_peer_->consensus()->role();
@@ -720,17 +714,14 @@ void CatalogManager::Shutdown() {
   //
   // There may be an outstanding table visitor thread modifying the table map,
   // so we must make a copy of it before we iterate. It's OK if the visitor
-  // adds more entries to the map even after we finish, but it may not start
-  // any new tasks for those entries.
-  TableInfoMap copy;
+  // adds more entries to the map even after we finish; it won't start any new
+  // tasks for those entries.
+  vector<scoped_refptr<TableInfo>> copy;
   {
     shared_lock<LockType> l(lock_);
-    copy = table_ids_map_;
-  }
-  for (const TableInfoMap::value_type &e : copy) {
-    e.second->AbortTasks();
-    e.second->WaitTasksCompletion();
+    AppendValuesFromMap(table_ids_map_, &copy);
   }
+  AbortAndWaitForAllTasks(copy);
 
   // Wait for any outstanding table visitors to finish.
   //
@@ -1508,9 +1499,6 @@ Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
     shared_lock<LockType> l(lock_);
     tablet = FindPtrOrNull(tablet_map_, report.tablet_id());
   }
-  RETURN_NOT_OK_PREPEND(CheckIsLeaderAndReady(),
-      Substitute("This master is no longer the leader, unable to handle report for tablet $0",
-                 report.tablet_id()));
   if (!tablet) {
     LOG(INFO) << "Got report from unknown tablet " << report.tablet_id()
               << ": Sending delete request for this orphan tablet";
@@ -2101,7 +2089,7 @@ class AsyncCreateReplica : public RetrySpecificTSRpcTask {
                      const string& permanent_uuid,
                      const scoped_refptr<TabletInfo>& tablet,
                      const TabletMetadataLock& tablet_lock)
-    : RetrySpecificTSRpcTask(master, permanent_uuid, tablet->table().get()),
+    : RetrySpecificTSRpcTask(master, permanent_uuid, tablet->table()),
       tablet_id_(tablet->tablet_id()) {
     deadline_ = start_ts_;
     deadline_.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_creation_timeout_ms));
@@ -2261,7 +2249,7 @@ class AsyncAlterTable : public RetryingTSRpcTask {
                   const scoped_refptr<TabletInfo>& tablet)
     : RetryingTSRpcTask(master,
                         gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)),
-                        tablet->table().get()),
+                        tablet->table()),
       tablet_(tablet) {
   }
 
@@ -3138,6 +3126,108 @@ std::string CatalogManager::LogPrefix() const {
                     sys_catalog_->tablet_peer()->permanent_uuid());
 }
 
+void CatalogManager::AbortAndWaitForAllTasks(
+    const vector<scoped_refptr<TableInfo>>& tables) {
+  for (const auto& t : tables) {
+    t->AbortTasks();
+  }
+  for (const auto& t : tables) {
+    t->WaitTasksCompletion();
+  }
+}
+
+////////////////////////////////////////////////////////////
+// CatalogManager::ScopedLeaderSharedLock
+////////////////////////////////////////////////////////////
+
+CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock(
+    CatalogManager* catalog)
+    : catalog_(DCHECK_NOTNULL(catalog)),
+      leader_shared_lock_(catalog->leader_lock_, std::try_to_lock) {
+
+  // Check if the catalog manager is running.
+  std::lock_guard<simple_spinlock> l(catalog_->state_lock_);
+  if (PREDICT_FALSE(catalog_->state_ != kRunning)) {
+    catalog_status_ = Status::ServiceUnavailable(
+        Substitute("Catalog manager is not initialized. State: $0",
+                   catalog_->state_));
+    return;
+  }
+
+  // Check if the catalog manager is the leader.
+  Consensus* consensus = catalog_->sys_catalog_->tablet_peer_->consensus();
+  ConsensusStatePB cstate = consensus->ConsensusState(CONSENSUS_CONFIG_COMMITTED);
+  string uuid = catalog_->master_->fs_manager()->uuid();
+  if (PREDICT_FALSE(!cstate.has_leader_uuid() || cstate.leader_uuid() != uuid)) {
+    leader_status_ = Status::IllegalState(
+        Substitute("Not the leader. Local UUID: $0, Consensus state: $1",
+                   uuid, cstate.ShortDebugString()));
+    return;
+  }
+  if (PREDICT_FALSE(catalog_->leader_ready_term_ != cstate.current_term() ||
+                    !leader_shared_lock_.owns_lock())) {
+    leader_status_ = Status::ServiceUnavailable(
+        "Leader not yet ready to serve requests");
+    return;
+  }
+}
+
+template<typename RespClass>
+bool CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedOrRespond(
+    RespClass* resp, RpcContext* rpc) {
+  if (PREDICT_FALSE(!catalog_status_.ok())) {
+    StatusToPB(catalog_status_, resp->mutable_error()->mutable_status());
+    resp->mutable_error()->set_code(
+        MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED);
+    rpc->RespondSuccess();
+    return false;
+  }
+  return true;
+}
+
+template<typename RespClass>
+bool CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedAndIsLeaderOrRespond(
+    RespClass* resp, RpcContext* rpc) {
+  Status& s = catalog_status_;
+  if (PREDICT_TRUE(s.ok())) {
+    s = leader_status_;
+  }
+  if (PREDICT_TRUE(s.ok())) {
+    return true;
+  }
+
+  StatusToPB(s, resp->mutable_error()->mutable_status());
+  resp->mutable_error()->set_code(MasterErrorPB::NOT_THE_LEADER);
+  rpc->RespondSuccess();
+  return false;
+}
+
+// Explicit specialization for callers outside this compilation unit.
+#define INITTED_OR_RESPOND(RespClass) \
+  template bool \
+  CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedOrRespond( \
+      RespClass* resp, RpcContext* rpc)
+#define INITTED_AND_LEADER_OR_RESPOND(RespClass) \
+  template bool \
+  CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedAndIsLeaderOrRespond( \
+      RespClass* resp, RpcContext* rpc)
+
+INITTED_OR_RESPOND(GetMasterRegistrationResponsePB);
+INITTED_OR_RESPOND(TSHeartbeatResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(AlterTableResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(CreateTableResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(DeleteTableResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(IsAlterTableDoneResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(IsCreateTableDoneResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(ListTablesResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(ListTabletServersResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(GetTableLocationsResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(GetTableSchemaResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(GetTabletLocationsResponsePB);
+
+#undef INITTED_OR_RESPOND
+#undef INITTED_AND_LEADER_OR_RESPOND
+
 ////////////////////////////////////////////////////////////
 // TabletInfo
 ////////////////////////////////////////////////////////////
@@ -3282,14 +3372,21 @@ bool TableInfo::IsCreateInProgress() const {
 }
 
 void TableInfo::AddTask(MonitoredTask* task) {
-  std::lock_guard<simple_spinlock> l(lock_);
   task->AddRef();
-  pending_tasks_.insert(task);
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    pending_tasks_.insert(task);
+  }
 }
 
 void TableInfo::RemoveTask(MonitoredTask* task) {
-  std::lock_guard<simple_spinlock> l(lock_);
-  pending_tasks_.erase(task);
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    pending_tasks_.erase(task);
+  }
+
+  // Done outside the lock so that if Release() drops the last ref to this
+  // TableInfo, RemoveTask() won't unlock a freed lock.
   task->Release();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d1d3cfe7/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index bc2e8e1..d6257b4 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -38,12 +38,14 @@
 #include "kudu/util/oid_generator.h"
 #include "kudu/util/promise.h"
 #include "kudu/util/random.h"
+#include "kudu/util/rw_mutex.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
 class Schema;
 class ThreadPool;
+class CreateTableStressTest_TestConcurrentCreateTableAndReloadMetadata_Test;
 
 namespace rpc {
 class RpcContext;
@@ -268,6 +270,73 @@ typedef MetadataLock<TableInfo> TableMetadataLock;
 // Thread-safe.
 class CatalogManager : public tserver::TabletPeerLookupIf {
  public:
+
+  // Scoped "shared lock" to serialize master leader elections.
+  //
+  // While in scope, blocks the catalog manager in the event that it becomes
+  // the leader of its Raft configuration and needs to reload its persistent
+  // metadata. Once destroyed, the catalog manager is unblocked.
+  //
+  // Usage:
+  //
+  // void MasterServiceImpl::CreateTable(const CreateTableRequestPB* req,
+  //                                     CreateTableResponsePB* resp,
+  //                                     rpc::RpcContext* rpc) {
+  //   CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  //   if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
+  //     return;
+  //   }
+  //
+  //   Status s = server_->catalog_manager()->CreateTable(req, resp, rpc);
+  //   CheckRespErrorOrSetUnknown(s, resp);
+  //   rpc->RespondSuccess();
+  // }
+  //
+  class ScopedLeaderSharedLock {
+   public:
+    // Creates a new shared lock, acquiring the catalog manager's leader_lock_
+    // for reading in the process. The lock is released when this object is
+    // destroyed.
+    //
+    // 'catalog' must outlive this object.
+    explicit ScopedLeaderSharedLock(CatalogManager* catalog);
+
+    // General status of the catalog manager. If not OK (e.g. the catalog
+    // manager is still being initialized), all operations are illegal and
+    // leader_status() should not be trusted.
+    const Status& catalog_status() const { return catalog_status_; }
+
+    // Leadership status of the catalog manager. If not OK, the catalog
+    // manager is not the leader, but some operations may still be legal.
+    const Status& leader_status() const {
+      DCHECK(catalog_status_.ok());
+      return leader_status_;
+    }
+
+    // Check that the catalog manager is initialized. It may or may not be the
+    // leader of its Raft configuration.
+    //
+    // If not initialized, writes the corresponding error to 'resp',
+    // responds to 'rpc', and returns false.
+    template<typename RespClass>
+    bool CheckIsInitializedOrRespond(RespClass* resp, rpc::RpcContext* rpc);
+
+    // Check that the catalog manager is initialized and that it is the leader
+    // of its Raft configuration. Initialization status takes precedence over
+    // leadership status.
+    //
+    // If not initialized or if not the leader, writes the corresponding error
+    // to 'resp', responds to 'rpc', and returns false.
+    template<typename RespClass>
+    bool CheckIsInitializedAndIsLeaderOrRespond(RespClass* resp, rpc::RpcContext* rpc);
+
+   private:
+    CatalogManager* catalog_;
+    shared_lock<RWMutex> leader_shared_lock_;
+    Status catalog_status_;
+    Status leader_status_;
+  };
+
   explicit CatalogManager(Master *master);
   virtual ~CatalogManager();
 
@@ -381,12 +450,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
       const consensus::StartRemoteBootstrapRequestPB& req,
       boost::optional<kudu::tserver::TabletServerErrorPB::Code>* error_code) OVERRIDE;
 
-  // Return OK if this CatalogManager is a leader in a consensus configuration and if
-  // the required leader state (metadata for tables and tablets) has
-  // been successfully loaded into memory. CatalogManager must be
-  // initialized before calling this method.
-  Status CheckIsLeaderAndReady() const;
-
   // Returns this CatalogManager's role in a consensus configuration. CatalogManager
   // must be initialized before calling this method.
   consensus::RaftPeerPB::Role Role() const;
@@ -396,9 +459,15 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   FRIEND_TEST(MasterTest, TestShutdownDuringTableVisit);
   FRIEND_TEST(MasterTest, TestGetTableLocationsDuringRepeatedTableVisit);
 
+  // This test calls VisitTablesAndTablets() directly.
+  FRIEND_TEST(kudu::CreateTableStressTest, TestConcurrentCreateTableAndReloadMetadata);
+
   friend class TableLoader;
   friend class TabletLoader;
 
+  typedef std::unordered_map<std::string, scoped_refptr<TableInfo>> TableInfoMap;
+  typedef std::unordered_map<std::string, scoped_refptr<TabletInfo>> TabletInfoMap;
+
   // Called by SysCatalog::SysCatalogStateChanged when this node
   // becomes the leader of a consensus configuration. Executes VisitTablesAndTabletsTask
   // via 'worker_pool_'.
@@ -567,6 +636,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // Conventional "T xxx P yyy: " prefix for logging.
   std::string LogPrefix() const;
 
+  // Aborts all tasks belonging to 'tables' and waits for them to finish.
+  void AbortAndWaitForAllTasks(const std::vector<scoped_refptr<TableInfo>>& tables);
+
   // TODO: the maps are a little wasteful of RAM, since the TableInfo/TabletInfo
   // objects have a copy of the string key. But STL doesn't make it
   // easy to make a "gettable set".
@@ -576,12 +648,10 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   mutable LockType lock_;
 
   // Table maps: table-id -> TableInfo and table-name -> TableInfo
-  typedef std::unordered_map<std::string, scoped_refptr<TableInfo> > TableInfoMap;
   TableInfoMap table_ids_map_;
   TableInfoMap table_names_map_;
 
   // Tablet maps: tablet-id -> TabletInfo
-  typedef std::unordered_map<std::string, scoped_refptr<TabletInfo> > TabletInfoMap;
   TabletInfoMap tablet_map_;
 
   // Names of tables that are currently being created. Only used in
@@ -624,6 +694,17 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // correctly.
   int64_t leader_ready_term_;
 
+  // Lock used to fence operations and leader elections. All logical operations
+  // (i.e. create table, alter table, etc.) should acquire this lock for
+  // reading. Following an election where this master is elected leader, it
+  // should acquire this lock for writing before reloading the metadata.
+  //
+  // Readers should not acquire this lock directly; use ScopedLeadershipLock
+  // instead.
+  //
+  // Always acquire this lock before state_lock_.
+  RWMutex leader_lock_;
+
   // Async operations are accessing some private methods
   // (TODO: this stuff should be deferred and done in the background thread)
   friend class AsyncAlterTable;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d1d3cfe7/src/kudu/master/master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 8261653..bbfb701 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -158,9 +158,11 @@ Status Master::WaitUntilCatalogManagerIsLeaderAndReadyForTests(const MonoDelta&
   int backoff_ms = 1;
   const int kMaxBackoffMs = 256;
   do {
-    s = catalog_manager_->CheckIsLeaderAndReady();
-    if (s.ok()) {
-      return Status::OK();
+    {
+      CatalogManager::ScopedLeaderSharedLock l(catalog_manager_.get());
+      if (l.catalog_status().ok() && l.leader_status().ok()) {
+        return Status::OK();
+      }
     }
     SleepFor(MonoDelta::FromMilliseconds(backoff_ms));
     backoff_ms = min(backoff_ms << 1, kMaxBackoffMs);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d1d3cfe7/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index a94c489..15af286 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -48,42 +48,6 @@ using std::shared_ptr;
 
 namespace {
 
-template<class RespClass>
-bool CheckCatalogManagerInitializedOrRespond(Master* master,
-                                             RespClass* resp,
-                                             rpc::RpcContext* rpc) {
-  if (PREDICT_FALSE(!master->catalog_manager()->IsInitialized())) {
-    SetupErrorAndRespond(resp->mutable_error(),
-                         Status::ServiceUnavailable("catalog manager has not been initialized"),
-                         MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED,
-                         rpc);
-    return false;
-  }
-  return true;
-}
-
-template<class RespClass>
-bool CheckIsLeaderOrRespond(Master* master,
-                            RespClass* resp,
-                            rpc::RpcContext* rpc) {
-  Status s = master->catalog_manager()->CheckIsLeaderAndReady();
-  if (PREDICT_FALSE(!s.ok())) {
-    SetupErrorAndRespond(resp->mutable_error(), s,
-                         MasterErrorPB::NOT_THE_LEADER,
-                         rpc);
-    return false;
-  }
-  return true;
-}
-
-template<class RespClass>
-bool CheckLeaderAndCatalogManagerInitializedOrRespond(Master* master,
-                                                      RespClass* resp,
-                                                      rpc::RpcContext* rpc) {
-  return PREDICT_TRUE(CheckCatalogManagerInitializedOrRespond(master, resp, rpc) &&
-                      CheckIsLeaderOrRespond(master, resp, rpc));
-}
-
 // If 's' is not OK and 'resp' has no application specific error set,
 // set the error field of 'resp' to match 's' and set the code to
 // UNKNOWN_ERROR.
@@ -97,17 +61,6 @@ void CheckRespErrorOrSetUnknown(const Status& s, RespClass* resp) {
 
 } // anonymous namespace
 
-static void SetupErrorAndRespond(MasterErrorPB* error,
-                                 const Status& s,
-                                 MasterErrorPB::Code code,
-                                 rpc::RpcContext* rpc) {
-  StatusToPB(s, error->mutable_status());
-  error->set_code(code);
-  // TODO RespondSuccess() is better called 'Respond'.
-  rpc->RespondSuccess();
-}
-
-
 MasterServiceImpl::MasterServiceImpl(Master* server)
   : MasterServiceIf(server->metric_entity()),
     server_(server) {
@@ -125,13 +78,13 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
   // If CatalogManager is not initialized don't even know whether
   // or not we will be a leader (so we can't tell whether or not we can
   // accept tablet reports).
-  if (!CheckCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedOrRespond(resp, rpc)) {
     return;
   }
 
   resp->mutable_master_instance()->CopyFrom(server_->instance_pb());
-  Status s = server_->catalog_manager()->CheckIsLeaderAndReady();
-  if (!s.ok()) {
+  if (!l.leader_status().ok()) {
     // For the time being, ignore heartbeats sent to non-leader distributed
     // masters.
     //
@@ -140,7 +93,7 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
     // masters, or by storing heartbeat information in a replicated
     // SysTable.
     LOG(WARNING) << "Received a heartbeat, but this Master instance is not a leader or a "
-                 << "single Master: " << s.ToString();
+                 << "single Master: " << l.leader_status().ToString();
     resp->set_leader_master(false);
     rpc->RespondSuccess();
     return;
@@ -167,7 +120,7 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
 
   // Look up the TS -- if it just registered above, it will be found here.
   // This allows the TS to register and tablet-report in the same RPC.
-  s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc);
+  Status s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc);
   if (s.IsNotFound()) {
     LOG(INFO) << "Got heartbeat from  unknown tablet server { "
               << req->common().ts_instance().ShortDebugString()
@@ -207,7 +160,8 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
 void MasterServiceImpl::GetTabletLocations(const GetTabletLocationsRequestPB* req,
                                            GetTabletLocationsResponsePB* resp,
                                            rpc::RpcContext* rpc) {
-  if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
     return;
   }
 
@@ -236,7 +190,8 @@ void MasterServiceImpl::GetTabletLocations(const GetTabletLocationsRequestPB* re
 void MasterServiceImpl::CreateTable(const CreateTableRequestPB* req,
                                     CreateTableResponsePB* resp,
                                     rpc::RpcContext* rpc) {
-  if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
     return;
   }
 
@@ -248,7 +203,8 @@ void MasterServiceImpl::CreateTable(const CreateTableRequestPB* req,
 void MasterServiceImpl::IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
                                           IsCreateTableDoneResponsePB* resp,
                                           rpc::RpcContext* rpc) {
-  if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
     return;
   }
 
@@ -260,7 +216,8 @@ void MasterServiceImpl::IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
 void MasterServiceImpl::DeleteTable(const DeleteTableRequestPB* req,
                                     DeleteTableResponsePB* resp,
                                     rpc::RpcContext* rpc) {
-  if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
     return;
   }
 
@@ -272,7 +229,8 @@ void MasterServiceImpl::DeleteTable(const DeleteTableRequestPB* req,
 void MasterServiceImpl::AlterTable(const AlterTableRequestPB* req,
                                    AlterTableResponsePB* resp,
                                    rpc::RpcContext* rpc) {
-  if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
     return;
   }
 
@@ -284,7 +242,8 @@ void MasterServiceImpl::AlterTable(const AlterTableRequestPB* req,
 void MasterServiceImpl::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
                                          IsAlterTableDoneResponsePB* resp,
                                          rpc::RpcContext* rpc) {
-  if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
     return;
   }
 
@@ -296,7 +255,8 @@ void MasterServiceImpl::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
 void MasterServiceImpl::ListTables(const ListTablesRequestPB* req,
                                    ListTablesResponsePB* resp,
                                    rpc::RpcContext* rpc) {
-  if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
     return;
   }
 
@@ -308,9 +268,11 @@ void MasterServiceImpl::ListTables(const ListTablesRequestPB* req,
 void MasterServiceImpl::GetTableLocations(const GetTableLocationsRequestPB* req,
                                           GetTableLocationsResponsePB* resp,
                                           rpc::RpcContext* rpc) {
-  if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
     return;
   }
+
   if (PREDICT_FALSE(FLAGS_master_inject_latency_on_tablet_lookups_ms > 0)) {
     SleepFor(MonoDelta::FromMilliseconds(FLAGS_master_inject_latency_on_tablet_lookups_ms));
   }
@@ -322,7 +284,8 @@ void MasterServiceImpl::GetTableLocations(const GetTableLocationsRequestPB* req,
 void MasterServiceImpl::GetTableSchema(const GetTableSchemaRequestPB* req,
                                        GetTableSchemaResponsePB* resp,
                                        rpc::RpcContext* rpc) {
-  if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
     return;
   }
 
@@ -334,7 +297,8 @@ void MasterServiceImpl::GetTableSchema(const GetTableSchemaRequestPB* req,
 void MasterServiceImpl::ListTabletServers(const ListTabletServersRequestPB* req,
                                           ListTabletServersResponsePB* resp,
                                           rpc::RpcContext* rpc) {
-  if (!CheckLeaderAndCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
     return;
   }
 
@@ -370,9 +334,12 @@ void MasterServiceImpl::GetMasterRegistration(const GetMasterRegistrationRequest
                                               rpc::RpcContext* rpc) {
   // instance_id must always be set in order for status pages to be useful.
   resp->mutable_instance_id()->CopyFrom(server_->instance_pb());
-  if (!CheckCatalogManagerInitializedOrRespond(server_, resp, rpc)) {
+
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedOrRespond(resp, rpc)) {
     return;
   }
+
   Status s = server_->GetMasterRegistration(resp->mutable_registration());
   CheckRespErrorOrSetUnknown(s, resp);
   resp->set_role(server_->catalog_manager()->Role());



[2/2] incubator-kudu git commit: master: fix corruption when AlterTable() races with CreateTable()

Posted by ad...@apache.org.
master: fix corruption when AlterTable() races with CreateTable()

Admittedly, this is a contrived scenario:
1. T1 tries to create table with name 'foo'
2. T2 tries to rename table with name 'bar' to 'foo'

With just the right timing, both operations succeed and the metadata now has
two tables named 'foo', each with a different table ID. The fix is simple:
generalize the "tables being created" logic already used by CreateTable().

Without the fix, the new test failed every 50th run or so. With it, it
doesn't fail in 1000 runs.

Change-Id: I6c9e4214c09bc47a5a10b12d6ffe8b35906708c9
Reviewed-on: http://gerrit.cloudera.org:8080/3607
Reviewed-by: Dan Burkert <da...@cloudera.com>
Tested-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/05369115
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/05369115
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/05369115

Branch: refs/heads/master
Commit: 05369115d4c4f73eb5dec53ef155ea5fabdf8c87
Parents: d1d3cfe
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Jul 8 18:56:23 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu Jul 14 21:10:10 2016 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc |  80 ++++++++++------
 src/kudu/master/catalog_manager.h  |  16 +++-
 src/kudu/master/master-test.cc     | 165 ++++++++++++++++++++++++++++++++
 3 files changed, 231 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05369115/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 2d09d82..88bd3e5 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -867,23 +867,25 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
     // b. Verify that the table does not exist.
     table = FindPtrOrNull(table_names_map_, req.name());
     if (table != nullptr) {
-      s = Status::AlreadyPresent("Table already exists", table->id());
+      s = Status::AlreadyPresent(Substitute("Table $0 already exists with id $1",
+                                 req.name(), table->id()));
       SetupError(resp->mutable_error(), MasterErrorPB::TABLE_ALREADY_PRESENT, s);
       return s;
     }
 
-    // c. Mark the table as being created (if it isn't already).
-    if (!InsertIfNotPresent(&tables_being_created_, req.name())) {
-      s = Status::ServiceUnavailable("Table is currently being created", req.name());
+    // c. Reserve the table name if possible.
+    if (!InsertIfNotPresent(&reserved_table_names_, req.name())) {
+      s = Status::ServiceUnavailable(Substitute(
+          "New table name $0 is already reserved", req.name()));
       SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
       return s;
     }
   }
 
-  // Ensure that if we return, we mark this table as no longer being created.
+  // Ensure that we drop the name reservation upon return.
   auto cleanup = MakeScopedCleanup([&] () {
     std::lock_guard<LockType> l(lock_);
-    CHECK_EQ(1, tables_being_created_.erase(req.name()));
+    CHECK_EQ(1, reserved_table_names_.erase(req.name()));
   });
 
   // d. Create the in-memory representation of the new table and its tablets.
@@ -1204,7 +1206,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
 
   RETURN_NOT_OK(CheckOnline());
 
-  // 1. Lookup the table and verify if it exists
+  // 1. Lookup the table and verify if it exists.
   TRACE("Looking up table");
   scoped_refptr<TableInfo> table;
   RETURN_NOT_OK(FindTable(req->table(), &table));
@@ -1222,10 +1224,22 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     return s;
   }
 
+  // 2. Having locked the table, look it up again, in case we raced with
+  //    another AlterTable() that renamed our table.
+  {
+    scoped_refptr<TableInfo> table_again;
+    CHECK_OK(FindTable(req->table(), &table_again));
+    if (table_again == nullptr) {
+      Status s = Status::NotFound("The table does not exist", req->table().DebugString());
+      SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
+      return s;
+    }
+  }
+
   bool has_changes = false;
   string table_name = l.data().name();
 
-  // 2. Calculate new schema for the on-disk state, not persisted yet
+  // 3. Calculate new schema for the on-disk state, not persisted yet.
   Schema new_schema;
   ColumnId next_col_id = ColumnId(l.data().pb.next_column_id());
   if (req->alter_schema_steps_size()) {
@@ -1241,33 +1255,46 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     has_changes = true;
   }
 
-  // 3. Try to acquire the new table name
+  // 4. Try to acquire the new table name.
   if (req->has_new_table_name()) {
     std::lock_guard<LockType> catalog_lock(lock_);
-
     TRACE("Acquired catalog manager lock");
 
-    // Verify that the table does not exist
+    // Verify that the table does not exist.
     scoped_refptr<TableInfo> other_table = FindPtrOrNull(table_names_map_, req->new_table_name());
     if (other_table != nullptr) {
-      Status s = Status::AlreadyPresent("Table already exists", other_table->id());
+      Status s = Status::AlreadyPresent(Substitute("Table $0 already exists with id $1",
+                                                   req->new_table_name(), table->id()));
       SetupError(resp->mutable_error(), MasterErrorPB::TABLE_ALREADY_PRESENT, s);
       return s;
     }
 
-    // Acquire the new table name (now we have 2 name for the same table)
-    table_names_map_[req->new_table_name()] = table;
-    l.mutable_data()->pb.set_name(req->new_table_name());
+    // Reserve the new table name if possible.
+    if (!InsertIfNotPresent(&reserved_table_names_, req->new_table_name())) {
+      Status s = Status::ServiceUnavailable(Substitute(
+          "Table name $0 is already reserved", req->new_table_name()));
+      SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
+      return s;
+    }
 
+    l.mutable_data()->pb.set_name(req->new_table_name());
     has_changes = true;
   }
 
+  // Ensure that we drop our reservation upon return.
+  auto cleanup = MakeScopedCleanup([&] () {
+    if (req->has_new_table_name()) {
+      std::lock_guard<LockType> l(lock_);
+      CHECK_EQ(1, reserved_table_names_.erase(req->new_table_name()));
+    }
+  });
+
   // Skip empty requests...
   if (!has_changes) {
     return Status::OK();
   }
 
-  // 4. Serialize the schema Increment the version number
+  // 5. Serialize the schema Increment the version number.
   if (new_schema.initialized()) {
     if (!l.data().pb.has_fully_applied_schema()) {
       l.mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l.data().pb.schema());
@@ -1281,7 +1308,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
                                          l.mutable_data()->pb.version(),
                                          LocalTimeAsString()));
 
-  // 5. Update sys-catalog with the new table schema.
+  // 6. Update sys-catalog with the new table schema.
   TRACE("Updating metadata on disk");
   SysCatalogTable::Actions actions;
   actions.table_to_update = table.get();
@@ -1291,24 +1318,23 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
         Substitute("An error occurred while updating sys-catalog tables entry: $0",
                    s.ToString()));
     LOG(WARNING) << s.ToString();
-    if (req->has_new_table_name()) {
-      std::lock_guard<LockType> catalog_lock(lock_);
-      CHECK_EQ(table_names_map_.erase(req->new_table_name()), 1);
-    }
     CheckIfNoLongerLeaderAndSetupError(s, resp);
     return s;
   }
 
-  // 6. Remove the old name
+  // 7. Remove the old name and add the new name.
   if (req->has_new_table_name()) {
-    TRACE("Removing old-name $0 from by-name map", table_name);
+    TRACE("Replacing name $0 with $1 in by-name table map",
+          table_name, req->new_table_name());
     std::lock_guard<LockType> l_map(lock_);
     if (table_names_map_.erase(table_name) != 1) {
-      PANIC_RPC(rpc, "Could not remove table from map, name=" + l.data().name());
+      PANIC_RPC(rpc, Substitute(
+          "Could not remove table (name $0) from map", table_name));
     }
+    InsertOrDie(&table_names_map_, req->new_table_name(), table);
   }
 
-  // 7. Update the in-memory state
+  // 8. Update the in-memory state.
   TRACE("Committing in-memory state");
   l.Commit();
 
@@ -2736,8 +2762,8 @@ Status CatalogManager::ProcessPendingAssignments(
     s = SelectReplicasForTablet(ts_descs, tablet);
     if (!s.ok()) {
       s = s.CloneAndPrepend(Substitute(
-          "An error occured while selecting replicas for tablet $0: $1",
-          tablet->tablet_id(), s.ToString()));
+          "An error occured while selecting replicas for tablet $0",
+          tablet->tablet_id()));
       break;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05369115/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index d6257b4..44f84c0 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -654,9 +654,19 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // Tablet maps: tablet-id -> TabletInfo
   TabletInfoMap tablet_map_;
 
-  // Names of tables that are currently being created. Only used in
-  // table creation so that transient tables are not made visible.
-  std::unordered_set<std::string> tables_being_created_;
+  // Names of tables that are currently reserved by CreateTable() or
+  // AlterTable().
+  //
+  // As a rule, operations that add new table names should do so as follows:
+  // 1. Acquire lock_.
+  // 2. Ensure table_names_map_ does not contain the new name.
+  // 3. Ensure reserved_table_names_ does not contain the new name.
+  // 4. Add the new name to reserved_table_names_.
+  // 5. Release lock_.
+  // 6. Perform the operation.
+  // 7. If it succeeded, add the name to table_names_map_ with lock_ held.
+  // 8. Remove the new name from reserved_table_names_ with lock_ held.
+  std::unordered_set<std::string> reserved_table_names_;
 
   Master *master_;
   Atomic32 closing_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05369115/src/kudu/master/master-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 015f2f8..87fb301 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -1027,5 +1027,170 @@ TEST_F(MasterTest, TestConcurrentCreateOfSameTable) {
   }
 }
 
+TEST_F(MasterTest, TestConcurrentRenameOfSameTable) {
+  const char* kOldName = "testtb";
+  const char* kNewName = "testtb-new";
+  const Schema kTableSchema({ ColumnSchema("key", INT32),
+                              ColumnSchema("v1", UINT64),
+                              ColumnSchema("v2", STRING) },
+                            1);
+  ASSERT_OK(CreateTable(kOldName, kTableSchema));
+
+  // Kick off a bunch of threads all trying to rename the same table.
+  vector<thread> threads;
+  for (int i = 0; i < 10; i++) {
+    threads.emplace_back([&]() {
+      AlterTableRequestPB req;
+      AlterTableResponsePB resp;
+      RpcController controller;
+
+      req.mutable_table()->set_table_name(kOldName);
+      req.set_new_table_name(kNewName);
+      CHECK_OK(proxy_->AlterTable(req, &resp, &controller));
+      SCOPED_TRACE(resp.DebugString());
+
+      // There are two expected outcomes:
+      //
+      // 1. This thread won the AlterTable() race: no error.
+      // 2. This thread lost the AlterTable() race: TABLE_NOT_FOUND error
+      //    with NotFound status.
+      if (resp.has_error()) {
+        Status s = StatusFromPB(resp.error().status());
+        string failure_msg = Substitute("Unexpected response: $0",
+                                        resp.DebugString());
+        CHECK_EQ(MasterErrorPB::TABLE_NOT_FOUND, resp.error().code()) << failure_msg;
+        CHECK(s.IsNotFound()) << failure_msg;
+      }
+    });
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+TEST_F(MasterTest, TestConcurrentCreateAndRenameOfSameTable) {
+  const char* kOldName = "testtb";
+  const char* kNewName = "testtb-new";
+  const Schema kTableSchema({ ColumnSchema("key", INT32),
+                              ColumnSchema("v1", UINT64),
+                              ColumnSchema("v2", STRING) },
+                            1);
+  ASSERT_OK(CreateTable(kOldName, kTableSchema));
+
+  AtomicBool create_success(false);
+  AtomicBool rename_success(false);
+  vector<thread> threads;
+  for (int i = 0; i < 10; i++) {
+    if (i % 2) {
+      threads.emplace_back([&]() {
+        CreateTableRequestPB req;
+        CreateTableResponsePB resp;
+        RpcController controller;
+
+        req.set_name(kNewName);
+        RowOperationsPBEncoder encoder(req.mutable_split_rows_range_bounds());
+
+        KuduPartialRow split1(&kTableSchema);
+        CHECK_OK(split1.SetInt32("key", 10));
+        encoder.Add(RowOperationsPB::SPLIT_ROW, split1);
+
+        KuduPartialRow split2(&kTableSchema);
+        CHECK_OK(split2.SetInt32("key", 20));
+        encoder.Add(RowOperationsPB::SPLIT_ROW, split2);
+
+        CHECK_OK(SchemaToPB(kTableSchema, req.mutable_schema()));
+        CHECK_OK(proxy_->CreateTable(req, &resp, &controller));
+        SCOPED_TRACE(resp.DebugString());
+
+        // There are three expected outcomes:
+        //
+        // 1. This thread finished well before the others: no error.
+        // 2. This thread raced with another thread: TABLE_NOT_FOUND error with
+        //    ServiceUnavailable status.
+        // 3. This thread finished well after the others: TABLE_ALREADY_PRESENT
+        //    error with AlreadyPresent status.
+        if (resp.has_error()) {
+          Status s = StatusFromPB(resp.error().status());
+          string failure_msg = Substitute("Unexpected response: $0",
+                                          resp.DebugString());
+          switch (resp.error().code()) {
+            case MasterErrorPB::TABLE_NOT_FOUND:
+              CHECK(s.IsServiceUnavailable()) << failure_msg;
+              break;
+            case MasterErrorPB::TABLE_ALREADY_PRESENT:
+              CHECK(s.IsAlreadyPresent()) << failure_msg;
+              break;
+            default:
+              FAIL() << failure_msg;
+          }
+        } else {
+          // Creating the table should only succeed once.
+          CHECK(!create_success.Exchange(true));
+        }
+      });
+    } else {
+      threads.emplace_back([&]() {
+        AlterTableRequestPB req;
+        AlterTableResponsePB resp;
+        RpcController controller;
+
+        req.mutable_table()->set_table_name(kOldName);
+        req.set_new_table_name(kNewName);
+        CHECK_OK(proxy_->AlterTable(req, &resp, &controller));
+        SCOPED_TRACE(resp.DebugString());
+
+        // There are three expected outcomes:
+        //
+        // 1. This thread finished well before the others: no error.
+        // 2. This thread raced with CreateTable(): TABLE_NOT_FOUND error with
+        //    ServiceUnavailable status (if raced during reservation stage)
+        //    or TABLE_ALREADY_PRESENT error with AlreadyPresent status (if
+        //    raced after reservation stage).
+        // 3. This thread raced with AlterTable() or finished well after the
+        //    others: TABLE_NOT_FOUND error with NotFound status.
+        if (resp.has_error()) {
+          Status s = StatusFromPB(resp.error().status());
+          string failure_msg = Substitute("Unexpected response: $0",
+                                          resp.DebugString());
+          switch (resp.error().code()) {
+            case MasterErrorPB::TABLE_NOT_FOUND:
+              CHECK(s.IsServiceUnavailable() || s.IsNotFound()) << failure_msg;
+              break;
+            case MasterErrorPB::TABLE_ALREADY_PRESENT:
+              CHECK(s.IsAlreadyPresent()) << failure_msg;
+              break;
+            default:
+              FAIL() << failure_msg;
+          }
+        } else {
+          // Renaming the table should only succeed once.
+          CHECK(!rename_success.Exchange(true));
+        }
+      });
+    }
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  // At least one of rename or create should have failed; if both succeeded
+  // there must be some sort of race.
+  CHECK(!rename_success.Load() || !create_success.Load());
+
+  unordered_set<string> live_tables;
+  live_tables.insert(kNewName);
+  if (create_success.Load()) {
+    live_tables.insert(kOldName);
+  }
+  MasterMetadataVerifier verifier(live_tables, {});
+  SysCatalogTable* sys_catalog =
+      mini_master_->master()->catalog_manager()->sys_catalog();
+  ASSERT_OK(sys_catalog->VisitTables(&verifier));
+  ASSERT_OK(sys_catalog->VisitTablets(&verifier));
+  ASSERT_OK(verifier.Verify());
+}
+
 } // namespace master
 } // namespace kudu