You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/10/12 02:43:28 UTC

kudu git commit: catalog manager: fix DDL race

Repository: kudu
Updated Branches:
  refs/heads/master 0174c2678 -> 9583cad6e


catalog manager: fix DDL race

This fixes a race between concurrent ALTER TABLE RENAME and DROP TABLE
DDL operations. Previously it was possible for two such operations to
concurrently complete on the same table. The included test is about 50%
flaky without the fix.

In addition, it changes the table finding logic to use the table ID and
table name if both are present in the TableIdentifier. Previously only
the table ID would be used in this case. No clients currently send both,
so this shouldn't have any real effect, but I think it's useful to be
able to specify both in certain cases.

Change-Id: I84ca3b207da28cd7dc43a077736da9b4e0ec6f37
Reviewed-on: http://gerrit.cloudera.org:8080/8254
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9583cad6
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9583cad6
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9583cad6

Branch: refs/heads/master
Commit: 9583cad6e9e227ddded9eae66019d392a6998094
Parents: 0174c26
Author: Dan Burkert <da...@apache.org>
Authored: Tue Oct 10 16:17:05 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Thu Oct 12 02:43:13 2017 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc | 128 ++++++++++++++++----------------
 src/kudu/master/catalog_manager.h  |  12 ++-
 src/kudu/master/master-test.cc     | 122 ++++++++++++++++++++++++++++++
 src/kudu/util/cow_object.h         |  28 ++++++-
 4 files changed, 225 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9583cad6/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 2cfdcc9..1958398 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1444,19 +1444,16 @@ Status CatalogManager::IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
-  scoped_refptr<TableInfo> table;
-
   // 1. Lookup the table and verify if it exists
-  TRACE("Looking up table");
-  RETURN_NOT_OK(FindTable(req->table(), &table));
+  TRACE("Looking up and locking table");
+  scoped_refptr<TableInfo> table;
+  TableMetadataLock l;
+  RETURN_NOT_OK(FindAndLockTable(req->table(), LockMode::READ, &table, &l));
   if (table == nullptr) {
     Status s = Status::NotFound("The table does not exist", SecureShortDebugString(req->table()));
     SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
     return s;
   }
-
-  TRACE("Locking table");
-  TableMetadataLock l(table.get(), LockMode::READ);
   RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
 
   // 2. Verify if the create is in-progress
@@ -1496,17 +1493,45 @@ scoped_refptr<TabletInfo> CatalogManager::CreateTabletInfo(const scoped_refptr<T
   return tablet;
 }
 
-Status CatalogManager::FindTable(const TableIdentifierPB& table_identifier,
-                                 scoped_refptr<TableInfo> *table_info) {
-  shared_lock<LockType> l(lock_);
+Status CatalogManager::FindAndLockTable(const TableIdentifierPB& table_identifier,
+                                        LockMode lock_mode,
+                                        scoped_refptr<TableInfo>* table_info,
+                                        TableMetadataLock* table_lock) {
+  scoped_refptr<TableInfo> table;
+  {
+    shared_lock<LockType> l(lock_);
+    if (table_identifier.has_table_id()) {
+      table = FindPtrOrNull(table_ids_map_, table_identifier.table_id());
+
+      // If the request contains both a table ID and table name, ensure that
+      // both match the same table.
+      if (table_identifier.has_table_name() &&
+          table.get() != FindPtrOrNull(table_names_map_, table_identifier.table_name()).get()) {
+        return Status::OK();
+      }
+    } else if (table_identifier.has_table_name()) {
+      table = FindPtrOrNull(table_names_map_, table_identifier.table_name());
+    } else {
+      return Status::InvalidArgument("Missing Table ID or Table Name");
+    }
+  }
 
-  if (table_identifier.has_table_id()) {
-    *table_info = FindPtrOrNull(table_ids_map_, table_identifier.table_id());
-  } else if (table_identifier.has_table_name()) {
-    *table_info = FindPtrOrNull(table_names_map_, table_identifier.table_name());
-  } else {
-    return Status::InvalidArgument("Missing Table ID or Table Name");
+  // If the table doesn't exist, don't attempt to lock it.
+  if (!table) {
+    return Status::OK();
   }
+
+  // Acquire the table lock.
+  TableMetadataLock lock(table.get(), lock_mode);
+
+  if (table_identifier.has_table_name() && table_identifier.table_name() != lock.data().name()) {
+    // We've encountered the table while it's in the process of being renamed;
+    // pretend it doesn't yet exist.
+    return Status::OK();
+  }
+
+  *table_info = std::move(table);
+  *table_lock = std::move(lock);
   return Status::OK();
 }
 
@@ -1520,17 +1545,15 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
                           RequestorString(rpc), SecureShortDebugString(*req));
 
   // 1. Look up the table, lock it, and mark it as removed.
-  TRACE("Looking up table");
+  TRACE("Looking up and locking table");
   scoped_refptr<TableInfo> table;
-  RETURN_NOT_OK(FindTable(req->table(), &table));
+  TableMetadataLock l;
+  RETURN_NOT_OK(FindAndLockTable(req->table(), LockMode::WRITE, &table, &l));
   if (table == nullptr) {
     Status s = Status::NotFound("The table does not exist", SecureShortDebugString(req->table()));
     SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
     return s;
   }
-
-  TRACE("Locking table");
-  TableMetadataLock l(table.get(), LockMode::WRITE);
   if (l.data().is_deleted()) {
     Status s = Status::NotFound("The table was deleted", l.data().pb.state_msg());
     SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
@@ -1887,39 +1910,25 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
   }
 
   // 2. Lookup the table, verify if it exists, and lock it for modification.
-  TRACE("Looking up table");
+  TRACE("Looking up and locking table");
   scoped_refptr<TableInfo> table;
-  RETURN_NOT_OK(FindTable(req->table(), &table));
+  TableMetadataLock l;
+  RETURN_NOT_OK(FindAndLockTable(req->table(), LockMode::WRITE, &table, &l));
   if (table == nullptr) {
     Status s = Status::NotFound("The table does not exist", SecureShortDebugString(req->table()));
     SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
     return s;
   }
-
-  TRACE("Locking table");
-  TableMetadataLock l(table.get(), LockMode::WRITE);
   if (l.data().is_deleted()) {
     Status s = Status::NotFound("The table was deleted", l.data().pb.state_msg());
     SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
     return s;
   }
 
-  // 3. Having locked the table, look it up again, in case we raced with another
-  //    AlterTable() that renamed our table.
-  {
-    scoped_refptr<TableInfo> table_again;
-    CHECK_OK(FindTable(req->table(), &table_again));
-    if (table_again == nullptr) {
-      Status s = Status::NotFound("The table does not exist", SecureShortDebugString(req->table()));
-      SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
-      return s;
-    }
-  }
-
   string table_name = l.data().name();
   *resp->mutable_table_id() = table->id();
 
-  // 4. Calculate and validate new schema for the on-disk state, not persisted yet.
+  // 3. Calculate and validate new schema for the on-disk state, not persisted yet.
   Schema new_schema;
   ColumnId next_col_id = ColumnId(l.data().pb.next_column_id());
   if (!alter_schema_steps.empty()) {
@@ -1941,7 +1950,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     }
   }
 
-  // 5. Validate and try to acquire the new table name.
+  // 4. Validate and try to acquire the new table name.
   if (req->has_new_table_name()) {
     Status s = ValidateIdentifier(req->new_table_name());
     if (!s.ok()) {
@@ -1981,7 +1990,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     }
   });
 
-  // 6. Alter table partitioning.
+  // 5. Alter table partitioning.
   vector<scoped_refptr<TabletInfo>> tablets_to_add;
   vector<scoped_refptr<TabletInfo>> tablets_to_drop;
   if (!alter_partitioning_steps.empty()) {
@@ -2011,7 +2020,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     return Status::OK();
   }
 
-  // 7. Serialize the schema and increment the version number.
+  // 6. Serialize the schema and increment the version number.
   if (has_metadata_changes_for_existing_tablets && !l.data().pb.has_fully_applied_schema()) {
     l.mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l.data().pb.schema());
   }
@@ -2032,7 +2041,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
                                            LocalTimeAsString()));
   }
 
-  // 8. Update sys-catalog with the new table schema and tablets to add/drop.
+  // 7. Update sys-catalog with the new table schema and tablets to add/drop.
   TRACE("Updating metadata on disk");
   string deletion_msg = "Partition dropped at " + LocalTimeAsString();
   SysCatalogTable::Actions actions;
@@ -2063,7 +2072,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     return s;
   }
 
-  // 9. Commit the in-memory state.
+  // 8. Commit the in-memory state.
   {
     TRACE("Committing alterations to in-memory state");
     // Commit new tablet in-memory state. This doesn't require taking the global
@@ -2137,19 +2146,16 @@ Status CatalogManager::IsAlterTableDone(const IsAlterTableDoneRequestPB* req,
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
-  scoped_refptr<TableInfo> table;
-
   // 1. Lookup the table and verify if it exists
-  TRACE("Looking up table");
-  RETURN_NOT_OK(FindTable(req->table(), &table));
+  TRACE("Looking up and locking table");
+  scoped_refptr<TableInfo> table;
+  TableMetadataLock l;
+  RETURN_NOT_OK(FindAndLockTable(req->table(), LockMode::READ, &table, &l));
   if (table == nullptr) {
     Status s = Status::NotFound("The table does not exist", SecureShortDebugString(req->table()));
     SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
     return s;
   }
-
-  TRACE("Locking table");
-  TableMetadataLock l(table.get(), LockMode::READ);
   RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
 
   // 2. Verify if the alter is in-progress
@@ -2165,19 +2171,16 @@ Status CatalogManager::GetTableSchema(const GetTableSchemaRequestPB* req,
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
+  // Lookup the table and verify if it exists
+  TRACE("Looking up and locking table");
   scoped_refptr<TableInfo> table;
-
-  // 1. Lookup the table and verify if it exists
-  TRACE("Looking up table");
-  RETURN_NOT_OK(FindTable(req->table(), &table));
+  TableMetadataLock l;
+  RETURN_NOT_OK(FindAndLockTable(req->table(), LockMode::READ, &table, &l));
   if (table == nullptr) {
     Status s = Status::NotFound("The table does not exist", SecureShortDebugString(req->table()));
     SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
     return s;
   }
-
-  TRACE("Locking table");
-  TableMetadataLock l(table.get(), LockMode::READ);
   RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
 
   if (l.data().pb.has_fully_applied_schema()) {
@@ -3953,15 +3956,16 @@ Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
     return Status::InvalidArgument("max_returned_locations must be greater than 0");
   }
 
+  // Lookup the table and verify if it exists
+  TRACE("Looking up and locking table");
   scoped_refptr<TableInfo> table;
-  RETURN_NOT_OK(FindTable(req->table(), &table));
+  TableMetadataLock l;
+  RETURN_NOT_OK(FindAndLockTable(req->table(), LockMode::READ, &table, &l));
   if (table == nullptr) {
-    Status s = Status::NotFound("The table does not exist");
+    Status s = Status::NotFound("The table does not exist", SecureShortDebugString(req->table()));
     SetupError(resp->mutable_error(), MasterErrorPB::TABLE_NOT_FOUND, s);
     return s;
   }
-
-  TableMetadataLock l(table.get(), LockMode::READ);
   RETURN_NOT_OK(CheckIfTableDeletedOrNotRunning(&l, resp));
 
   vector<scoped_refptr<TabletInfo>> tablets_in_range;

http://git-wip-us.apache.org/repos/asf/kudu/blob/9583cad6/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 41d0134..aab38dc 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -36,6 +36,7 @@
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/ts_manager.h"
@@ -329,6 +330,9 @@ template<class MetadataClass>
 class MetadataLock : public CowLock<typename MetadataClass::cow_state> {
  public:
   typedef CowLock<typename MetadataClass::cow_state> super;
+  MetadataLock()
+      : super() {
+  }
   MetadataLock(MetadataClass* info, LockMode mode)
       : super(DCHECK_NOTNULL(info)->mutable_metadata(), mode) {
   }
@@ -719,8 +723,12 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   Status BuildLocationsForTablet(const scoped_refptr<TabletInfo>& tablet,
                                  TabletLocationsPB* locs_pb);
 
-  Status FindTable(const TableIdentifierPB& table_identifier,
-                   scoped_refptr<TableInfo>* table_info);
+  // Looks up the table and locks it with the provided lock mode. If the table
+  // does not exist, the lock is not acquired and the table is not modified.
+  Status FindAndLockTable(const TableIdentifierPB& table_identifier,
+                          LockMode lock_mode,
+                          scoped_refptr<TableInfo>* table_info,
+                          TableMetadataLock* table_lock) WARN_UNUSED_RESULT;
 
   // Extract the set of tablets that must be processed because not running yet.
   void ExtractTabletsToProcess(std::vector<scoped_refptr<TabletInfo>>* tablets_to_process);

http://git-wip-us.apache.org/repos/asf/kudu/blob/9583cad6/src/kudu/master/master-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index dc5da9a..9656dc3 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -1351,6 +1351,68 @@ TEST_F(MasterTest, TestConcurrentCreateAndRenameOfSameTable) {
   ASSERT_OK(verifier.Verify());
 }
 
+TEST_F(MasterTest, TestConcurrentRenameAndDeleteOfSameTable) {
+  const char* kTableName = "testtb";
+  const Schema kTableSchema({ ColumnSchema("key", INT32) }, 1);
+
+  ASSERT_OK(CreateTable(kTableName, kTableSchema));
+
+  bool renamed = false;
+  bool deleted = false;
+
+  thread renamer([&] {
+      AlterTableRequestPB req;
+      AlterTableResponsePB resp;
+      RpcController controller;
+
+      req.mutable_table()->set_table_name(kTableName);
+      req.set_new_table_name("testtb-renamed");
+      CHECK_OK(proxy_->AlterTable(req, &resp, &controller));
+
+      // There are two expected outcomes:
+      //
+      // 1. This thread won the race: no error.
+      // 2. This thread lost the race: TABLE_NOT_FOUND error with NotFound status.
+      if (resp.has_error()) {
+        Status s = StatusFromPB(resp.error().status());
+        string failure_msg = Substitute("Unexpected response: $0",
+                                        SecureDebugString(resp));
+        CHECK_EQ(MasterErrorPB::TABLE_NOT_FOUND, resp.error().code()) << failure_msg;
+        CHECK(s.IsNotFound()) << failure_msg;
+      } else {
+        renamed = true;
+      }
+  });
+
+  thread dropper([&] {
+      DeleteTableRequestPB req;
+      DeleteTableResponsePB resp;
+      RpcController controller;
+
+      req.mutable_table()->set_table_name(kTableName);
+      CHECK_OK(proxy_->DeleteTable(req, &resp, &controller));
+
+      // There are two expected outcomes:
+      //
+      // 1. This thread won the race: no error.
+      // 2. This thread lost the race: TABLE_NOT_FOUND error with NotFound status.
+      if (resp.has_error()) {
+        Status s = StatusFromPB(resp.error().status());
+        string failure_msg = Substitute("Unexpected response: $0",
+                                        SecureDebugString(resp));
+        CHECK_EQ(MasterErrorPB::TABLE_NOT_FOUND, resp.error().code()) << failure_msg;
+        CHECK(s.IsNotFound()) << failure_msg;
+      } else {
+        deleted = true;
+      }
+  });
+
+  renamer.join();
+  dropper.join();
+
+  ASSERT_TRUE(renamed ^ deleted);
+}
+
 // Unit tests for the ConnectToMaster() RPC:
 // should issue authentication tokens and the master CA cert.
 TEST_F(MasterTest, TestConnectToMaster) {
@@ -1383,5 +1445,65 @@ TEST_F(MasterTest, TestSignOwnCertAndLoadTSKs) {
     });
 }
 
+TEST_F(MasterTest, TestTableIdentifierWithIdAndName) {
+  const char *kTableName = "testtb";
+  const Schema kTableSchema({ ColumnSchema("key", INT32) }, 1);
+
+  ASSERT_OK(CreateTable(kTableName, kTableSchema));
+
+  ListTablesResponsePB tables;
+  ASSERT_NO_FATAL_FAILURE(DoListAllTables(&tables));
+  ASSERT_EQ(1, tables.tables_size());
+  ASSERT_EQ(kTableName, tables.tables(0).name());
+  string table_id = tables.tables(0).id();
+  ASSERT_FALSE(table_id.empty());
+
+  // Delete the table with an invalid ID.
+  {
+    DeleteTableRequestPB req;
+    DeleteTableResponsePB resp;
+    RpcController controller;
+    req.mutable_table()->set_table_name(kTableName);
+    req.mutable_table()->set_table_id("abc123");
+    ASSERT_OK(proxy_->DeleteTable(req, &resp, &controller));
+    ASSERT_TRUE(resp.has_error());
+    ASSERT_EQ(MasterErrorPB::TABLE_NOT_FOUND, resp.error().code());
+  }
+
+  // Delete the table with an invalid name.
+  {
+    DeleteTableRequestPB req;
+    DeleteTableResponsePB resp;
+    RpcController controller;
+    req.mutable_table()->set_table_name("abc123");
+    req.mutable_table()->set_table_id(table_id);
+    ASSERT_OK(proxy_->DeleteTable(req, &resp, &controller));
+    ASSERT_TRUE(resp.has_error());
+    ASSERT_EQ(MasterErrorPB::TABLE_NOT_FOUND, resp.error().code());
+  }
+
+  // Delete the table with an invalid ID and name.
+  {
+    DeleteTableRequestPB req;
+    DeleteTableResponsePB resp;
+    RpcController controller;
+    req.mutable_table()->set_table_name("abc123");
+    req.mutable_table()->set_table_id("abc123");
+    ASSERT_OK(proxy_->DeleteTable(req, &resp, &controller));
+    ASSERT_TRUE(resp.has_error());
+    ASSERT_EQ(MasterErrorPB::TABLE_NOT_FOUND, resp.error().code());
+  }
+
+  {
+    DeleteTableRequestPB req;
+    DeleteTableResponsePB resp;
+    RpcController controller;
+    req.mutable_table()->set_table_name(kTableName);
+    req.mutable_table()->set_table_id(table_id);
+    ASSERT_OK(proxy_->DeleteTable(req, &resp, &controller));
+    ASSERT_FALSE(resp.has_error()) << resp.error().DebugString();
+  }
+}
+
 } // namespace master
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9583cad6/src/kudu/util/cow_object.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/cow_object.h b/src/kudu/util/cow_object.h
index 05c2d7e..55c30b0 100644
--- a/src/kudu/util/cow_object.h
+++ b/src/kudu/util/cow_object.h
@@ -167,6 +167,14 @@ std::ostream& operator<<(std::ostream& o, LockMode m);
 template<class State>
 class CowLock {
  public:
+
+   // An unlocked CowLock. This is useful for default constructing a lock to be
+   // moved in to.
+   CowLock()
+    : cow_(nullptr),
+      mode_(LockMode::RELEASED) {
+   }
+
   // Lock in either read or write mode.
   CowLock(CowObject<State>* cow,
           LockMode mode)
@@ -192,6 +200,25 @@ class CowLock {
     }
   }
 
+  // Disable copying.
+  CowLock(const CowLock&) = delete;
+  CowLock& operator=(const CowLock&) = delete;
+
+  // Allow moving.
+  CowLock(CowLock&& other)
+    : cow_(other.cow_),
+      mode_(other.mode_) {
+    other.cow_ = nullptr;
+    other.mode_ = LockMode::RELEASED;
+  }
+  CowLock& operator=(CowLock&& other) {
+    cow_ = other.cow_;
+    mode_ = other.mode_;
+    other.cow_ = nullptr;
+    other.mode_ = LockMode::RELEASED;
+    return *this;
+  }
+
   // Commit the underlying object.
   // Requires that the caller hold the lock in write mode.
   void Commit() {
@@ -242,7 +269,6 @@ class CowLock {
  private:
   CowObject<State>* cow_;
   LockMode mode_;
-  DISALLOW_COPY_AND_ASSIGN(CowLock);
 };
 
 // Scoped object that locks multiple CowObjects for reading or for writing.