You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2018/05/14 19:20:31 UTC
kudu git commit: catalog-manager: refactor AlterTable and DeleteTable
methods
Repository: kudu
Updated Branches:
refs/heads/master a65e58ec0 -> ca3e38759
catalog-manager: refactor AlterTable and DeleteTable methods
This commit splits CatalogManager::AlterTable and
CatalogManager::DeleteTable in two. One method handles RPC specifics,
and the second handles applying the alter/delete operation to the Kudu
catalog. The RPC-handling method thus calls into the method which
modifies the catalog. A follow-up commit in the HMS integration series
will add another front-end method specific to HMS notification log
listener events.
Change-Id: Ia384768ee7246411052ccadc66c33e83b541c195
Reviewed-on: http://gerrit.cloudera.org:8080/10378
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ca3e3875
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ca3e3875
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ca3e3875
Branch: refs/heads/master
Commit: ca3e387591d1947c24f9af814bb6429447902db5
Parents: a65e58e
Author: Dan Burkert <da...@apache.org>
Authored: Thu May 10 13:01:33 2018 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Mon May 14 19:20:05 2018 +0000
----------------------------------------------------------------------
src/kudu/integration-tests/alter_table-test.cc | 2 +-
src/kudu/master/catalog_manager.cc | 75 ++++++++++++---------
src/kudu/master/catalog_manager.h | 25 ++++---
src/kudu/master/master_service.cc | 4 +-
4 files changed, 62 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/integration-tests/alter_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index f771209..640f1cb 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -344,7 +344,7 @@ TEST_F(AlterTableTest, TestAddNotNullableColumnWithoutDefaults) {
cluster_->mini_master()->master()->catalog_manager();
master::CatalogManager::ScopedLeaderSharedLock l(catalog);
ASSERT_OK(l.first_failed_status());
- Status s = catalog->AlterTable(&req, &resp, nullptr);
+ Status s = catalog->AlterTableRpc(req, &resp, nullptr);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_STR_CONTAINS(s.ToString(), "column `c2`: NOT NULL columns must have a default");
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 174c7fc..6438901 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1700,19 +1700,22 @@ Status CatalogManager::FindAndLockTable(const ReqClass& request,
return Status::OK();
}
-Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
- DeleteTableResponsePB* resp,
- rpc::RpcContext* rpc) {
+Status CatalogManager::DeleteTableRpc(const DeleteTableRequestPB& req,
+ DeleteTableResponsePB* resp,
+ rpc::RpcContext* rpc) {
+ LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
+ RequestorString(rpc), SecureShortDebugString(req));
+ return DeleteTable(req, resp);
+}
+
+Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req, DeleteTableResponsePB* resp) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
- LOG(INFO) << Substitute("Servicing DeleteTable request from $0:\n$1",
- RequestorString(rpc), SecureShortDebugString(*req));
-
// 1. Look up the table, lock it, and mark it as removed.
scoped_refptr<TableInfo> table;
TableMetadataLock l;
- RETURN_NOT_OK(FindAndLockTable(*req, resp, LockMode::WRITE, &table, &l));
+ RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::WRITE, &table, &l));
if (l.data().is_deleted()) {
return SetupError(Status::NotFound("the table was deleted", l.data().pb.state_msg()),
resp, MasterErrorPB::TABLE_NOT_FOUND);
@@ -1788,7 +1791,9 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB* req,
TRACE("Removing table from by-name map");
std::lock_guard<LockType> l_map(lock_);
if (table_names_map_.erase(l.data().name()) != 1) {
- PANIC_RPC(rpc, "Could not remove table from map, name=" + l.data().name());
+ LOG(FATAL) << "Could not remove table " << table->ToString()
+ << " from map in response to DeleteTable request: "
+ << SecureShortDebugString(req);
}
}
@@ -2064,19 +2069,22 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
return Status::OK();
}
-Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
- AlterTableResponsePB* resp,
- rpc::RpcContext* rpc) {
+Status CatalogManager::AlterTableRpc(const AlterTableRequestPB& req,
+ AlterTableResponsePB* resp,
+ rpc::RpcContext* rpc) {
+ LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
+ RequestorString(rpc), SecureShortDebugString(req));
+ return AlterTable(req, resp);
+}
+
+Status CatalogManager::AlterTable(const AlterTableRequestPB& req, AlterTableResponsePB* resp) {
leader_lock_.AssertAcquiredForReading();
RETURN_NOT_OK(CheckOnline());
- LOG(INFO) << Substitute("Servicing AlterTable request from $0:\n$1",
- RequestorString(rpc), SecureShortDebugString(*req));
-
// 1. Group the steps into schema altering steps and partition altering steps.
vector<AlterTableRequestPB::Step> alter_schema_steps;
vector<AlterTableRequestPB::Step> alter_partitioning_steps;
- for (const auto& step : req->alter_schema_steps()) {
+ for (const auto& step : req.alter_schema_steps()) {
switch (step.type()) {
case AlterTableRequestPB::ADD_COLUMN:
case AlterTableRequestPB::DROP_COLUMN:
@@ -2099,7 +2107,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
// 2. Lookup the table, verify if it exists, and lock it for modification.
scoped_refptr<TableInfo> table;
TableMetadataLock l;
- RETURN_NOT_OK(FindAndLockTable(*req, resp, LockMode::WRITE, &table, &l));
+ RETURN_NOT_OK(FindAndLockTable(req, resp, LockMode::WRITE, &table, &l));
if (l.data().is_deleted()) {
return SetupError(
Status::NotFound("the table was deleted", l.data().pb.state_msg()),
@@ -2131,43 +2139,43 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
resp, MasterErrorPB::INVALID_SCHEMA));
// 4. Validate and try to acquire the new table name.
- if (req->has_new_table_name()) {
+ if (req.has_new_table_name()) {
RETURN_NOT_OK(SetupError(
- ValidateIdentifier(req->new_table_name()).CloneAndPrepend("invalid table name"),
+ ValidateIdentifier(req.new_table_name()).CloneAndPrepend("invalid table name"),
resp, MasterErrorPB::INVALID_SCHEMA));
std::lock_guard<LockType> catalog_lock(lock_);
TRACE("Acquired catalog manager lock");
// Verify that the table does not exist.
- scoped_refptr<TableInfo> other_table = FindPtrOrNull(table_names_map_, req->new_table_name());
+ scoped_refptr<TableInfo> other_table = FindPtrOrNull(table_names_map_, req.new_table_name());
if (other_table != nullptr) {
return SetupError(
Status::AlreadyPresent(Substitute("table $0 already exists with id $1",
- req->new_table_name(), table->id())),
+ req.new_table_name(), table->id())),
resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
}
// Reserve the new table name if possible.
- if (!InsertIfNotPresent(&reserved_table_names_, req->new_table_name())) {
+ if (!InsertIfNotPresent(&reserved_table_names_, req.new_table_name())) {
// ServiceUnavailable will cause the client to retry the create table
// request. We don't want to outright fail the request with
// 'AlreadyPresent', because a table name reservation can be rolled back
// in the case of an error. Instead, we force the client to retry at a
// later time.
return SetupError(Status::ServiceUnavailable(Substitute(
- "table name $0 is already reserved", req->new_table_name())),
+ "table name $0 is already reserved", req.new_table_name())),
resp, MasterErrorPB::TABLE_ALREADY_PRESENT);
}
- l.mutable_data()->pb.set_name(req->new_table_name());
+ l.mutable_data()->pb.set_name(req.new_table_name());
}
// Ensure that we drop our reservation upon return.
auto cleanup = MakeScopedCleanup([&] () {
- if (req->has_new_table_name()) {
+ if (req.has_new_table_name()) {
std::lock_guard<LockType> l(lock_);
- CHECK_EQ(1, reserved_table_names_.erase(req->new_table_name()));
+ CHECK_EQ(1, reserved_table_names_.erase(req.new_table_name()));
}
});
@@ -2177,7 +2185,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
if (!alter_partitioning_steps.empty()) {
TRACE("Apply alter partitioning");
Schema client_schema;
- RETURN_NOT_OK(SetupError(SchemaFromPB(req->schema(), &client_schema),
+ RETURN_NOT_OK(SetupError(SchemaFromPB(req.schema(), &client_schema),
resp, MasterErrorPB::UNKNOWN_ERROR));
RETURN_NOT_OK(SetupError(
ApplyAlterPartitioningSteps(l, table, client_schema, alter_partitioning_steps,
@@ -2188,7 +2196,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
// Set to true if columns are altered, added or dropped.
bool has_schema_changes = !alter_schema_steps.empty();
// Set to true if there are schema changes, or the table is renamed.
- bool has_metadata_changes = has_schema_changes || req->has_new_table_name();
+ bool has_metadata_changes = has_schema_changes || req.has_new_table_name();
// Set to true if there are partitioning changes.
bool has_partitioning_changes = !alter_partitioning_steps.empty();
// Set to true if metadata changes need to be applied to existing tablets.
@@ -2227,7 +2235,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
// HMS is available. We do not allow altering tables while the HMS is
// unavailable, because that would cause the catalogs to become unsynchronized.
if (hms_catalog_ != nullptr && has_metadata_changes) {
- const string& new_name = req->has_new_table_name() ? req->new_table_name() : table_name;
+ const string& new_name = req.has_new_table_name() ? req.new_table_name() : table_name;
Status s = hms_catalog_->AlterTable(table->id(), table_name, new_name, new_schema);
if (!s.ok()) {
@@ -2249,7 +2257,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
<< s.ToString();
return;
}
- const string& new_name = req->has_new_table_name() ? req->new_table_name() : table_name;
+ const string& new_name = req.has_new_table_name() ? req.new_table_name() : table_name;
WARN_NOT_OK(hms_catalog_->AlterTable(table->id(), new_name, table_name, schema),
"An error occurred while attempting to roll-back HMS table entry alteration");
@@ -2299,12 +2307,13 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
// Take the global catalog manager lock in order to modify the global table
// and tablets indices.
std::lock_guard<LockType> lock(lock_);
- if (req->has_new_table_name()) {
+ if (req.has_new_table_name()) {
if (table_names_map_.erase(table_name) != 1) {
- PANIC_RPC(rpc, Substitute(
- "Could not remove table (name $0) from map", table_name));
+ LOG(FATAL) << "Could not remove table " << table->ToString()
+ << " from map in response to AlterTable request: "
+ << SecureShortDebugString(req);
}
- InsertOrDie(&table_names_map_, req->new_table_name(), table);
+ InsertOrDie(&table_names_map_, req.new_table_name(), table);
}
// Insert new tablets into the global tablet map. After this, the tablets
http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 957c3ea..447a391 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -533,21 +533,21 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
Status IsCreateTableDone(const IsCreateTableDoneRequestPB* req,
IsCreateTableDoneResponsePB* resp);
- // Delete the specified table
+ // Delete the specified table in response to a DeleteTableRequest RPC.
//
// The RPC context is provided for logging/tracing purposes,
// but this function does not itself respond to the RPC.
- Status DeleteTable(const DeleteTableRequestPB* req,
- DeleteTableResponsePB* resp,
- rpc::RpcContext* rpc);
+ Status DeleteTableRpc(const DeleteTableRequestPB& req,
+ DeleteTableResponsePB* resp,
+ rpc::RpcContext* rpc);
- // Alter the specified table
+ // Alter the specified table in response to an AlterTableRequest RPC.
//
// The RPC context is provided for logging/tracing purposes,
// but this function does not itself respond to the RPC.
- Status AlterTable(const AlterTableRequestPB* req,
- AlterTableResponsePB* resp,
- rpc::RpcContext* rpc);
+ Status AlterTableRpc(const AlterTableRequestPB& req,
+ AlterTableResponsePB* resp,
+ rpc::RpcContext* rpc);
// Get the information about an in-progress alter operation
//
@@ -664,6 +664,15 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
typedef std::unordered_map<std::string, scoped_refptr<TableInfo>> TableInfoMap;
typedef std::unordered_map<std::string, scoped_refptr<TabletInfo>> TabletInfoMap;
+ // Delete the specified table in the catalog.
+ Status DeleteTable(const DeleteTableRequestPB& req,
+ DeleteTableResponsePB* resp) WARN_UNUSED_RESULT;
+
+
+ // Alter the specified table in the catalog.
+ Status AlterTable(const AlterTableRequestPB& req,
+ AlterTableResponsePB* resp) WARN_UNUSED_RESULT;
+
// Called by SysCatalog::SysCatalogStateChanged when this node
// becomes the leader of a consensus configuration. Executes
// PrepareForLeadershipTask() via 'worker_pool_'.
http://git-wip-us.apache.org/repos/asf/kudu/blob/ca3e3875/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 5ccaa66..b71f470 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -332,7 +332,7 @@ void MasterServiceImpl::DeleteTable(const DeleteTableRequestPB* req,
return;
}
- Status s = server_->catalog_manager()->DeleteTable(req, resp, rpc);
+ Status s = server_->catalog_manager()->DeleteTableRpc(*req, resp, rpc);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}
@@ -345,7 +345,7 @@ void MasterServiceImpl::AlterTable(const AlterTableRequestPB* req,
return;
}
- Status s = server_->catalog_manager()->AlterTable(req, resp, rpc);
+ Status s = server_->catalog_manager()->AlterTableRpc(*req, resp, rpc);
CheckRespErrorOrSetUnknown(s, resp);
rpc->RespondSuccess();
}