You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2018/05/14 19:20:31 UTC

kudu git commit: catalog-manager: refactor AlterTable and DeleteTable methods

Repository: kudu
Updated Branches:
  refs/heads/master a65e58ec0 -> ca3e38759


catalog-manager: refactor AlterTable and DeleteTable methods

This commit splits CatalogManager::AlterTable and
CatalogManager::DeleteTable in two. One method handles RPC specifics,
and the second handles applying the alter/delete operation to the Kudu
catalog. The RPC-handling method thus calls into the method which
modifies the catalog.  A follow-up commit in the HMS integration series
will add another front-end method specific to HMS notification log
listener events.

Change-Id: Ia384768ee7246411052ccadc66c33e83b541c195
Reviewed-on: http://gerrit.cloudera.org:8080/10378
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ca3e3875
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ca3e3875
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ca3e3875

Branch: refs/heads/master
Commit: ca3e387591d1947c24f9af814bb6429447902db5
Parents: a65e58e
Author: Dan Burkert <da...@apache.org>
Authored: Thu May 10 13:01:33 2018 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Mon May 14 19:20:05 2018 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/alter_table-test.cc |  2 +-
 src/kudu/master/catalog_manager.cc             | 75 ++++++++++++---------
 src/kudu/master/catalog_manager.h              | 25 ++++---
 src/kudu/master/master_service.cc              |  4 +-
 4 files changed, 62 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/integration-tests/alter_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index f771209..640f1cb 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -344,7 +344,7 @@ TEST_F(AlterTableTest, TestAddNotNullableColumnWithoutDefaults) {
         cluster_->mini_master()->master()->catalog_manager();
     master::CatalogManager::ScopedLeaderSharedLock l(catalog);
     ASSERT_OK(l.first_failed_status());
-    Status s = catalog->AlterTable(&req, &resp, nullptr);
+    Status s = catalog->AlterTableRpc(req, &resp, nullptr);
     ASSERT_TRUE(s.IsInvalidArgument());
     ASSERT_STR_CONTAINS(s.ToString(), "column `c2`: NOT NULL columns must have a default");
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 174c7fc..6438901 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1700,19 +1700,22 @@ Status CatalogManager::FindAndLockTable(const ReqClass& request,
   return Status::OK();
 }
 
-Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
-                                   DeleteTableResponsePB* resp,
-                                   rpc::RpcContext* rpc) {
+Status CatalogManager::DeleteTableRpc(const DeleteTableRequestPB& req,
+                                      DeleteTableResponsePB* resp,
+                                      rpc::RpcContext* rpc) {
+  LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
+                          RequestorString(rpc), SecureShortDebugString(req));
+  return DeleteTable(req, resp);
+}
+
+Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req, DeleteTableResponsePB* resp) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
-  LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
-                          RequestorString(rpc), SecureShortDebugString(*req));
-
   // 1. Look up the table, lock it, and mark it as removed.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  RETURN_NOT_OK(FindAndLockTable(*req, resp, LockMode::WRITE, &table, &l));
+  RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::WRITE, &table, &l));
   if (l.data().is_deleted()) {
     return SetupError(Status::NotFound("the table was deleted", l.data().pb.state_msg()),
         resp, MasterErrorPB::TABLE_NOT_FOUND);
@@ -1788,7 +1791,9 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
       TRACE("Removing table from by-name map");
       std::lock_guard<LockType> l_map(lock_);
       if (table_names_map_.erase(l.data().name()) != 1) {
-        PANIC_RPC(rpc, "Could not remove table from map, name=" + l.data().name());
+        LOG(FATAL) << "Could not remove table " << table->ToString()
+                   << " from map in response to DeleteTable request: "
+                   << SecureShortDebugString(req);
       }
     }
 
@@ -2064,19 +2069,22 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
   return Status::OK();
 }
 
-Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
-                                  AlterTableResponsePB* resp,
-                                  rpc::RpcContext* rpc) {
+Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req,
+                                     AlterTableResponsePB* resp,
+                                     rpc::RpcContext* rpc) {
+  LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
+                          RequestorString(rpc), SecureShortDebugString(req));
+  return AlterTable(req, resp);
+}
+
+Status CatalogManager::AlterTable(const AlterTableRequestPB& req, AlterTableResponsePB* resp) {
   leader_lock_.AssertAcquiredForReading();
   RETURN_NOT_OK(CheckOnline());
 
-  LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
-                          RequestorString(rpc), SecureShortDebugString(*req));
-
   // 1. Group the steps into schema altering steps and partition altering steps.
   vector<AlterTableRequestPB::Step> alter_schema_steps;
   vector<AlterTableRequestPB::Step> alter_partitioning_steps;
-  for (const auto& step : req->alter_schema_steps()) {
+  for (const auto& step : req.alter_schema_steps()) {
     switch (step.type()) {
       case AlterTableRequestPB::ADD_COLUMN:
       case AlterTableRequestPB::DROP_COLUMN:
@@ -2099,7 +2107,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
   // 2. Lookup the table, verify if it exists, and lock it for modification.
   scoped_refptr<TableInfo> table;
   TableMetadataLock l;
-  RETURN_NOT_OK(FindAndLockTable(*req, resp, LockMode::WRITE, &table, &l));
+  RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::WRITE, &table, &l));
   if (l.data().is_deleted()) {
     return SetupError(
         Status::NotFound("the table was deleted", l.data().pb.state_msg()),
@@ -2131,43 +2139,43 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
         resp, MasterErrorPB::INVALID_SCHEMA));
 
   // 4. Validate and try to acquire the new table name.
-  if (req->has_new_table_name()) {
+  if (req.has_new_table_name()) {
     RETURN_NOT_OK(SetupError(
-          ValidateIdentifier(req->new_table_name()).CloneAndPrepend("invalid table name"),
+          ValidateIdentifier(req.new_table_name()).CloneAndPrepend("invalid table name"),
           resp, MasterErrorPB::INVALID_SCHEMA));
 
     std::lock_guard<LockType> catalog_lock(lock_);
     TRACE("Acquired catalog manager lock");
 
     // Verify that the table does not exist.
-    scoped_refptr<TableInfo> other_table = FindPtrOrNull(table_names_map_, req->new_table_name());
+    scoped_refptr<TableInfo> other_table = FindPtrOrNull(table_names_map_, req.new_table_name());
     if (other_table != nullptr) {
       return SetupError(
           Status::AlreadyPresent(Substitute("table $0 already exists with id $1",
-              req->new_table_name(), table->id())),
+              req.new_table_name(), table->id())),
           resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
     }
 
     // Reserve the new table name if possible.
-    if (!InsertIfNotPresent(&reserved_table_names_, req->new_table_name())) {
+    if (!InsertIfNotPresent(&reserved_table_names_, req.new_table_name())) {
       // ServiceUnavailable will cause the client to retry the create table
       // request. We don't want to outright fail the request with
       // 'AlreadyPresent', because a table name reservation can be rolled back
       // in the case of an error. Instead, we force the client to retry at a
       // later time.
       return SetupError(Status::ServiceUnavailable(Substitute(
-              "table name $0 is already reserved", req->new_table_name())),
+              "table name $0 is already reserved", req.new_table_name())),
           resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
     }
 
-    l.mutable_data()->pb.set_name(req->new_table_name());
+    l.mutable_data()->pb.set_name(req.new_table_name());
   }
 
   // Ensure that we drop our reservation upon return.
   auto cleanup = MakeScopedCleanup([&] () {
-    if (req->has_new_table_name()) {
+    if (req.has_new_table_name()) {
       std::lock_guard<LockType> l(lock_);
-      CHECK_EQ(1, reserved_table_names_.erase(req->new_table_name()));
+      CHECK_EQ(1, reserved_table_names_.erase(req.new_table_name()));
     }
   });
 
@@ -2177,7 +2185,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
   if (!alter_partitioning_steps.empty()) {
     TRACE("Apply alter partitioning");
     Schema client_schema;
-    RETURN_NOT_OK(SetupError(SchemaFromPB(req->schema(), &client_schema),
+    RETURN_NOT_OK(SetupError(SchemaFromPB(req.schema(), &client_schema),
           resp, MasterErrorPB::UNKNOWN_ERROR));
     RETURN_NOT_OK(SetupError(
           ApplyAlterPartitioningSteps(l, table, client_schema, alter_partitioning_steps,
@@ -2188,7 +2196,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
   // Set to true if columns are altered, added or dropped.
   bool has_schema_changes = !alter_schema_steps.empty();
   // Set to true if there are schema changes, or the table is renamed.
-  bool has_metadata_changes = has_schema_changes || req->has_new_table_name();
+  bool has_metadata_changes = has_schema_changes || req.has_new_table_name();
   // Set to true if there are partitioning changes.
   bool has_partitioning_changes = !alter_partitioning_steps.empty();
   // Set to true if metadata changes need to be applied to existing tablets.
@@ -2227,7 +2235,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
   // HMS is available. We do not allow altering tables while the HMS is
   // unavailable, because that would cause the catalogs to become unsynchronized.
   if (hms_catalog_ != nullptr && has_metadata_changes) {
-    const string& new_name = req->has_new_table_name() ? req->new_table_name() : table_name;
+    const string& new_name = req.has_new_table_name() ? req.new_table_name() : table_name;
     Status s = hms_catalog_->AlterTable(table->id(), table_name, new_name, new_schema);
 
     if (!s.ok()) {
@@ -2249,7 +2257,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
                        << s.ToString();
           return;
         }
-        const string& new_name = req->has_new_table_name() ? req->new_table_name() : table_name;
+        const string& new_name = req.has_new_table_name() ? req.new_table_name() : table_name;
 
         WARN_NOT_OK(hms_catalog_->AlterTable(table->id(), new_name, table_name, schema),
                     "An error occurred while attempting to roll-back HMS table entry alteration");
@@ -2299,12 +2307,13 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
     // Take the global catalog manager lock in order to modify the global table
     // and tablets indices.
     std::lock_guard<LockType> lock(lock_);
-    if (req->has_new_table_name()) {
+    if (req.has_new_table_name()) {
       if (table_names_map_.erase(table_name) != 1) {
-        PANIC_RPC(rpc, Substitute(
-            "Could not remove table (name $0) from map", table_name));
+        LOG(FATAL) << "Could not remove table " << table->ToString()
+                   << " from map in response to AlterTable request: "
+                   << SecureShortDebugString(req);
       }
-      InsertOrDie(&table_names_map_, req->new_table_name(), table);
+      InsertOrDie(&table_names_map_, req.new_table_name(), table);
     }
 
     // Insert new tablets into the global tablet map. After this, the tablets

http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 957c3ea..447a391 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -533,21 +533,21 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   Status IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
                            IsCreateTableDoneResponsePB* resp);
 
-  // Delete the specified table
+  // Delete the specified table in response to a DeleteTableRequest RPC.
   //
   // The RPC context is provided for logging/tracing purposes,
   // but this function does not itself respond to the RPC.
-  Status DeleteTable(const DeleteTableRequestPB* req,
-                     DeleteTableResponsePB* resp,
-                     rpc::RpcContext* rpc);
+  Status DeleteTableRpc(const DeleteTableRequestPB& req,
+                        DeleteTableResponsePB* resp,
+                        rpc::RpcContext* rpc);
 
-  // Alter the specified table
+  // Alter the specified table in response to an AlterTableRequest RPC.
   //
   // The RPC context is provided for logging/tracing purposes,
   // but this function does not itself respond to the RPC.
-  Status AlterTable(const AlterTableRequestPB* req,
-                    AlterTableResponsePB* resp,
-                    rpc::RpcContext* rpc);
+  Status AlterTableRpc(const AlterTableRequestPB& req,
+                       AlterTableResponsePB* resp,
+                       rpc::RpcContext* rpc);
 
   // Get the information about an in-progress alter operation
   //
@@ -664,6 +664,15 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   typedef std::unordered_map<std::string, scoped_refptr<TableInfo>> TableInfoMap;
   typedef std::unordered_map<std::string, scoped_refptr<TabletInfo>> TabletInfoMap;
 
+  // Delete the specified table in the catalog.
+  Status DeleteTable(const DeleteTableRequestPB& req,
+                     DeleteTableResponsePB* resp) WARN_UNUSED_RESULT;
+
+
+  // Alter the specified table in the catalog.
+  Status AlterTable(const AlterTableRequestPB& req,
+                    AlterTableResponsePB* resp) WARN_UNUSED_RESULT;
+
   // Called by SysCatalog::SysCatalogStateChanged when this node
   // becomes the leader of a consensus configuration. Executes
   // PrepareForLeadershipTask() via 'worker_pool_'.

http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 5ccaa66..b71f470 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -332,7 +332,7 @@ void MasterServiceImpl::DeleteTable(const DeleteTableRequestPB* req,
     return;
   }
 
-  Status s = server_->catalog_manager()->DeleteTable(req, resp, rpc);
+  Status s = server_->catalog_manager()->DeleteTableRpc(*req, resp, rpc);
   CheckRespErrorOrSetUnknown(s, resp);
   rpc->RespondSuccess();
 }
@@ -345,7 +345,7 @@ void MasterServiceImpl::AlterTable(const AlterTableRequestPB* req,
     return;
   }
 
-  Status s = server_->catalog_manager()->AlterTable(req, resp, rpc);
+  Status s = server_->catalog_manager()->AlterTableRpc(*req, resp, rpc);
   CheckRespErrorOrSetUnknown(s, resp);
   rpc->RespondSuccess();
 }