You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/04/27 03:10:57 UTC

[2/3] incubator-kudu git commit: master: don't expose any CreateTable() state too early

master: don't expose any CreateTable() state too early

This was originally addressed by auditing all catalog manager readers (see
commit f971971), but David pointed out that we could do a better job if we
tracked ongoing table creation independently. With that in mind, here's an
approach that fixes the problem in CreateTable(). It's not absolutely
necessary, but:
1. It is forgiving to readers who forget to check the state of the table.
2. It obviates the need for any abort logic should table creation fail.

The new test highlights the tradeoff inherent by not introducing a new lock
that is used by "losers" to wait on table creation: the operation can fail
in a not quite TABLE_ALREADY_PRESENT but not quite TABLE_NOT_FOUND way. I
chose to express this using a new combination of error and code rather than
introduce a new code altogether.

Change-Id: Ib9e11037e7f8b4c34db5e0f2b5be00f806532365
Reviewed-on: http://gerrit.cloudera.org:8080/2714
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/6e7a04aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/6e7a04aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/6e7a04aa

Branch: refs/heads/master
Commit: 6e7a04aa54d34864b5bbdd486a96024342f6f302
Parents: af8a4af
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Apr 1 21:13:43 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Apr 27 01:08:02 2016 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc | 94 +++++++++++++--------------------
 src/kudu/master/catalog_manager.h  | 13 ++---
 src/kudu/master/master-test.cc     | 58 ++++++++++++++++++++
 3 files changed, 101 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6e7a04aa/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 71f51fb..006bf63 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -84,6 +84,7 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/random_util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/threadpool.h"
@@ -733,42 +734,6 @@ Status CatalogManager::CheckOnline() const {
   return Status::OK();
 }
 
-void CatalogManager::AbortTableCreation(TableInfo* table,
-                                        const vector<TabletInfo*>& tablets) {
-  string table_id = table->id();
-  string table_name = table->mutable_metadata()->mutable_dirty()->pb.name();
-  vector<string> tablet_ids_to_erase;
-  for (TabletInfo* tablet : tablets) {
-    tablet_ids_to_erase.push_back(tablet->tablet_id());
-  }
-
-  LOG(INFO) << "Aborting creation of table '" << table_name << "', erasing table and tablets (" <<
-      JoinStrings(tablet_ids_to_erase, ",") << ") from in-memory state.";
-
-  // Since this is a failed creation attempt, it's safe to just abort
-  // all tasks, as (by definition) no tasks may be pending against a
-  // table that has failed to succesfully create.
-  table->AbortTasks();
-  table->WaitTasksCompletion();
-
-  boost::lock_guard<LockType> l(lock_);
-
-  // Call AbortMutation() manually, as otherwise the lock won't be
-  // released.
-  for (TabletInfo* tablet : tablets) {
-    tablet->mutable_metadata()->AbortMutation();
-  }
-  table->mutable_metadata()->AbortMutation();
-  for (const string& tablet_id_to_erase : tablet_ids_to_erase) {
-    CHECK_EQ(tablet_map_.erase(tablet_id_to_erase), 1)
-        << "Unable to erase tablet " << tablet_id_to_erase << " from tablet map.";
-  }
-  CHECK_EQ(table_names_map_.erase(table_name), 1)
-      << "Unable to erase table named " << table_name << " from table names map.";
-  CHECK_EQ(table_ids_map_.erase(table_id), 1)
-      << "Unable to erase tablet with id " << table_id << " from tablet ids map.";
-}
-
 // Create a new table.
 // See README file in this directory for a description of the design.
 Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
@@ -867,7 +832,6 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   }
 
   scoped_refptr<TableInfo> table;
-  vector<TabletInfo*> tablets;
   {
     boost::lock_guard<LockType> l(lock_);
     TRACE("Acquired catalog manager lock");
@@ -880,28 +844,34 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
       return s;
     }
 
-    // c. Add the new table in "preparing" state.
-    table = CreateTableInfo(req, schema, partition_schema);
-    table_ids_map_[table->id()] = table;
-    table_names_map_[req.name()] = table;
-
-    // d. Create the TabletInfo objects in state PREPARING.
-    for (const Partition& partition : partitions) {
-      PartitionPB partition_pb;
-      partition.ToPB(&partition_pb);
-      scoped_refptr<TabletInfo> t = CreateTabletInfo(table.get(), partition_pb);
-      tablets.push_back(t.get());
-
-      // Add the new tablet to the catalog-manager-wide map by tablet ID.
-      InsertOrDie(&tablet_map_, t->tablet_id(), std::move(t));
+    // c. Mark the table as being created (if it isn't already).
+    if (!InsertIfNotPresent(&tables_being_created_, req.name())) {
+      s = Status::ServiceUnavailable("Table is currently being created", req.name());
+      SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
+      return s;
     }
+  }
 
-    resp->set_table_id(table->id());
+  // Ensure that if we return, we mark this table as no longer being created.
+  auto cleanup = MakeScopedCleanup([&] () {
+    boost::lock_guard<LockType> l(lock_);
+    CHECK_EQ(1, tables_being_created_.erase(req.name()));
+  });
 
-    // Add the tablets to the table's map.
-    table->AddTablets(tablets);
+  // d. Create the in-memory representation of the new table and its tablets.
+  //    It's not yet in any global maps; that will happen in step g below.
+  table = CreateTableInfo(req, schema, partition_schema);
+  vector<TabletInfo*> tablets;
+  vector<scoped_refptr<TabletInfo>> tablet_refs;
+  for (const Partition& partition : partitions) {
+    PartitionPB partition_pb;
+    partition.ToPB(&partition_pb);
+    scoped_refptr<TabletInfo> t = CreateTabletInfo(table.get(), partition_pb);
+    tablets.push_back(t.get());
+    tablet_refs.emplace_back(std::move(t));
   }
-  TRACE("Inserted new table and tablet info into CatalogManager maps");
+  table->AddTablets(tablets);
+  TRACE("Created new table and tablet info");
 
   // NOTE: the table and tablets are already locked for write at this point,
   // since the CreateTableInfo/CreateTabletInfo functions leave them in that state.
@@ -922,7 +892,6 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
     s = s.CloneAndPrepend(Substitute("An error occurred while writing to sys-catalog: $0",
                                      s.ToString()));
     LOG(WARNING) << s.ToString();
-    AbortTableCreation(table.get(), tablets);
     CheckIfNoLongerLeaderAndSetupError(s, resp);
     return s;
   }
@@ -935,6 +904,19 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
     tablet->mutable_metadata()->CommitMutation();
   }
 
+  // g. Make the new table and tablets visible in the catalog.
+  {
+    boost::lock_guard<LockType> l(lock_);
+
+    table_ids_map_[table->id()] = table;
+    table_names_map_[req.name()] = table;
+    for (const auto& tablet : tablet_refs) {
+      InsertOrDie(&tablet_map_, tablet->tablet_id(), std::move(tablet));
+    }
+  }
+  TRACE("Inserted table and tablets into CatalogManager maps");
+
+  resp->set_table_id(table->id());
   VLOG(1) << "Created table " << table->ToString();
   background_tasks_->Wake();
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6e7a04aa/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index a7eb896..e003994 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -601,13 +601,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
 
   std::string GenerateId() { return oid_generator_.Next(); }
 
-  // Abort creation of 'table': abort all mutation for TabletInfo and
-  // TableInfo objects (releasing all COW locks), abort all pending
-  // tasks associated with the table, and erase any state related to
-  // the table we failed to create from the in-memory maps
-  // ('table_names_map_', 'table_ids_map_', 'tablet_map_' below).
-  void AbortTableCreation(TableInfo* table, const std::vector<TabletInfo*>& tablets);
-
   // Conventional "T xxx P yyy: " prefix for logging.
   std::string LogPrefix() const;
 
@@ -615,7 +608,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   // objects have a copy of the string key. But STL doesn't make it
   // easy to make a "gettable set".
 
-  // Lock protecting the various maps below.
+  // Lock protecting the various maps and sets below.
   typedef rw_spinlock LockType;
   mutable LockType lock_;
 
@@ -628,6 +621,10 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   typedef std::unordered_map<std::string, scoped_refptr<TabletInfo> > TabletInfoMap;
   TabletInfoMap tablet_map_;
 
+  // Names of tables that are currently being created. Only used in
+  // table creation so that transient tables are not made visible.
+  std::unordered_set<std::string> tables_being_created_;
+
   Master *master_;
   Atomic32 closing_;
   ObjectIdGenerator oid_generator_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6e7a04aa/src/kudu/master/master-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index af5c986..2315afe 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -846,5 +846,63 @@ TEST_F(MasterTest, TestMasterMetadataConsistentDespiteFailures) {
   ASSERT_OK(verifier.Verify());
 }
 
+static void CreateTableOrFail(const char* kTableName,
+                              const Schema* kSchema,
+                              MasterServiceProxy* proxy) {
+  CreateTableRequestPB req;
+  CreateTableResponsePB resp;
+  RpcController controller;
+
+  req.set_name(kTableName);
+  CHECK_OK(SchemaToPB(*kSchema, req.mutable_schema()));
+  CHECK_OK(proxy->CreateTable(req, &resp, &controller));
+  SCOPED_TRACE(resp.DebugString());
+
+  // There are three expected outcomes:
+  //
+  // 1. This thread won the CreateTable() race: no error.
+  // 2. This thread lost the CreateTable() race: TABLE_NOT_FOUND error
+  //    with ServiceUnavailable status.
+  // 3. This thread arrived after the CreateTable() race was already over:
+  //    TABLE_ALREADY_PRESENT error with AlreadyPresent status.
+  if (resp.has_error()) {
+    Status s = StatusFromPB(resp.error().status());
+    string failure_msg = Substitute("Unexpected response: $0",
+                                    resp.DebugString());
+    switch (resp.error().code()) {
+      case MasterErrorPB::TABLE_NOT_FOUND:
+        CHECK(s.IsServiceUnavailable()) << failure_msg;
+        break;
+      case MasterErrorPB::TABLE_ALREADY_PRESENT:
+        CHECK(s.IsAlreadyPresent()) << failure_msg;
+        break;
+      default:
+        FAIL() << failure_msg;
+    }
+  }
+}
+
+TEST_F(MasterTest, TestConcurrentCreateOfSameTable) {
+  const char* kTableName = "testtb";
+  const Schema kTableSchema({ ColumnSchema("key", INT32),
+                              ColumnSchema("v1", UINT64),
+                              ColumnSchema("v2", STRING) },
+                            1);
+
+  // Kick off a bunch of threads all trying to create the same table.
+  vector<scoped_refptr<Thread>> threads;
+  for (int i = 0; i < 10; i++) {
+    scoped_refptr<Thread> t;
+    EXPECT_OK(Thread::Create("test", "test",
+                             &CreateTableOrFail, kTableName, &kTableSchema,
+                             proxy_.get(), &t));
+    threads.emplace_back(std::move(t));
+  }
+
+  for (const auto& t : threads) {
+    t->Join();
+  }
+}
+
 } // namespace master
 } // namespace kudu