You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/06/08 18:28:40 UTC
[kudu] 01/02: [c++] Support table rename between scan token
creation and rehydration
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit b0ee39962d0b9766a7d9de9b7207524495319631
Author: Will Berkeley <wd...@gmail.com>
AuthorDate: Wed Jun 5 13:38:33 2019 -0700
[c++] Support table rename between scan token creation and rehydration
Previously, if a scan token was created against table 'foo', and table
'foo' was renamed to table 'bar', rehydrating the scan token into a
scanner would either
- fail if there is no longer a table 'foo', or
- attempt to scan another table if that table had been renamed to 'foo'.
This patch alters how the C++ client manages scan tokens. It prefers to
use the table id to identify a table, rather than a table name. This
eliminates the above two problems.
This is a follow up to the Java client patch for the same (and that
patch includes the necessary protobuf changes).
Change-Id: Ib4d48513dff67012f26a99877b168d777d3049fd
Reviewed-on: http://gerrit.cloudera.org:8080/13521
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
---
src/kudu/client/client-internal.cc | 53 ++++++++++++++++++++++++++++++++--
src/kudu/client/client-internal.h | 11 ++++++-
src/kudu/client/client.cc | 35 ++++++----------------
src/kudu/client/scan_token-internal.cc | 16 +++++++++-
src/kudu/client/scan_token-test.cc | 48 ++++++++++++++++++++++++++++++
5 files changed, 132 insertions(+), 31 deletions(-)
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 09e42bb..fc310c0 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -453,17 +453,61 @@ bool KuduClient::Data::FetchCachedAuthzToken(const string& table_id, SignedToken
return authz_token_cache_.Fetch(table_id, token);
}
+Status KuduClient::Data::OpenTable(KuduClient* client,
+ const TableIdentifierPB& table_identifier,
+ client::sp::shared_ptr<KuduTable>* table) {
+ KuduSchema schema;
+ string table_id;
+ string table_name;
+ int num_replicas;
+ PartitionSchema partition_schema;
+ MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout_;
+ RETURN_NOT_OK(GetTableSchema(client,
+ deadline,
+ table_identifier,
+ &schema,
+ &partition_schema,
+ &table_id,
+ &table_name,
+ &num_replicas));
+
+ // When the table name is specified, use the caller-provided table name.
+ // This reduces surprises, e.g., when the HMS integration is on and table
+ // names are case-insensitive.
+ string effective_table_name = table_identifier.has_table_name() ?
+ table_identifier.table_name() :
+ table_name;
+
+ // TODO(wdberkeley): In the future, probably will look up the table in some
+ // map to reuse KuduTable instances.
+ table->reset(new KuduTable(client->shared_from_this(),
+ effective_table_name, table_id, num_replicas,
+ schema, partition_schema));
+
+ // When opening a table, clear the existing cached non-covered range entries.
+ // This avoids surprises where a new table instance won't be able to see the
+ // current range partitions of a table for up to the ttl.
+ meta_cache_->ClearNonCoveredRangeEntries(table_id);
+
+ return Status::OK();
+}
+
Status KuduClient::Data::GetTableSchema(KuduClient* client,
- const string& table_name,
const MonoTime& deadline,
+ const TableIdentifierPB& table,
KuduSchema* schema,
PartitionSchema* partition_schema,
- string* table_id,
+ std::string* table_id,
+ std::string* table_name,
int* num_replicas) {
GetTableSchemaRequestPB req;
GetTableSchemaResponsePB resp;
- req.mutable_table()->set_table_name(table_name);
+ if (table.has_table_id()) {
+ req.mutable_table()->set_table_id(table.table_id());
+ } else {
+ req.mutable_table()->set_table_name(table.table_name());
+ }
Synchronizer sync;
AsyncLeaderMasterRpc<GetTableSchemaRequestPB, GetTableSchemaResponsePB> rpc(
deadline, client, BackoffType::EXPONENTIAL, req, &resp,
@@ -486,6 +530,9 @@ Status KuduClient::Data::GetTableSchema(KuduClient* client,
if (partition_schema) {
*partition_schema = std::move(new_partition_schema);
}
+ if (table_name) {
+ *table_name = resp.table_name();
+ }
if (table_id) {
*table_id = resp.table_id();
}
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index 2098bda..79ffd79 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -139,12 +139,21 @@ class KuduClient::Data {
master::TableIdentifierPB table,
const MonoTime& deadline);
+ // Open the table identified by 'table_identifier'.
+ Status OpenTable(KuduClient* client,
+ const master::TableIdentifierPB& table_identifier,
+ client::sp::shared_ptr<KuduTable>* table);
+
+ // Retrieve table information about the table identified by 'table_id_or_name'.
+ // 'table_id_or_name' should contain a table id or name as 'identifier_type'
+ // is ID or NAME, respectively.
Status GetTableSchema(KuduClient* client,
- const std::string& table_name,
const MonoTime& deadline,
+ const master::TableIdentifierPB& table,
KuduSchema* schema,
PartitionSchema* partition_schema,
std::string* table_id,
+ std::string* table_name,
int* num_replicas);
Status InitLocalHostNames();
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 8cde561..42ce211 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -427,12 +427,15 @@ Status KuduClient::IsAlterTableInProgress(const string& table_name,
Status KuduClient::GetTableSchema(const string& table_name,
KuduSchema* schema) {
MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
+ TableIdentifierPB table;
+ table.set_table_name(table_name);
return data_->GetTableSchema(this,
- table_name,
deadline,
+ table,
schema,
nullptr, // partition schema
nullptr, // table id
+ nullptr, // table name
nullptr); // number of replicas
}
@@ -500,31 +503,11 @@ Status KuduClient::TableExists(const string& table_name, bool* exists) {
Status KuduClient::OpenTable(const string& table_name,
shared_ptr<KuduTable>* table) {
- KuduSchema schema;
- string table_id;
- int num_replicas;
- PartitionSchema partition_schema;
- MonoTime deadline = MonoTime::Now() + default_admin_operation_timeout();
- RETURN_NOT_OK(data_->GetTableSchema(this,
- table_name,
- deadline,
- &schema,
- &partition_schema,
- &table_id,
- &num_replicas));
-
- // TODO: in the future, probably will look up the table in some map to reuse
- // KuduTable instances.
- table->reset(new KuduTable(shared_from_this(),
- table_name, table_id, num_replicas,
- schema, partition_schema));
-
- // When opening a table, clear the existing cached non-covered range entries.
- // This avoids surprises where a new table instance won't be able to see the
- // current range partitions of a table for up to the ttl.
- data_->meta_cache_->ClearNonCoveredRangeEntries(table_id);
-
- return Status::OK();
+ TableIdentifierPB table_identifier;
+ table_identifier.set_table_name(table_name);
+ return data_->OpenTable(this,
+ table_identifier,
+ table);
}
shared_ptr<KuduSession> KuduClient::NewSession() {
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 46bc270..86f73bb 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -48,6 +48,7 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/master.pb.h"
#include "kudu/util/async_util.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
@@ -60,6 +61,9 @@ using std::vector;
using strings::Substitute;
namespace kudu {
+
+using master::TableIdentifierPB;
+
namespace client {
using internal::MetaCache;
@@ -103,8 +107,17 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
}
}
+ TableIdentifierPB table_identifier;
+ if (message.has_table_id()) {
+ table_identifier.set_table_id(message.table_id());
+ }
+ if (message.has_table_name()) {
+ table_identifier.set_table_name(message.table_name());
+ }
sp::shared_ptr<KuduTable> table;
- RETURN_NOT_OK(client->OpenTable(message.table_name(), &table));
+ RETURN_NOT_OK(client->data_->OpenTable(client,
+ table_identifier,
+ &table));
Schema* schema = table->schema().schema_;
unique_ptr<KuduScanner> scan_builder(new KuduScanner(table.get()));
@@ -230,6 +243,7 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
ScanTokenPB pb;
+ pb.set_table_id(table->id());
pb.set_table_name(table->name());
RETURN_NOT_OK(SchemaToColumnPBs(*configuration_.projection(), pb.mutable_projected_columns(),
SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS));
diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc
index fe94ff8..ebfb160 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -16,6 +16,7 @@
// under the License.
#include <atomic>
+#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
@@ -27,6 +28,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/client.pb.h"
#include "kudu/client/scan_batch.h"
@@ -637,5 +639,51 @@ TEST_F(ScanTokenTest, TestConcurrentAlterTable) {
delete scanner_ptr;
}
+// Tests the results of creating scan tokens, renaming the table being
+// scanned, and then executing the scan tokens.
+TEST_F(ScanTokenTest, TestConcurrentRenameTable) {
+ const char* kTableName = "scan-token-rename";
+ // Create schema
+ KuduSchema schema;
+ {
+ KuduSchemaBuilder builder;
+ builder.AddColumn("key")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
+ builder.AddColumn("a")->NotNull()->Type(KuduColumnSchema::INT64);
+ ASSERT_OK(builder.Build(&schema));
+ }
+
+ // Create table
+ shared_ptr<KuduTable> table;
+ {
+ unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
+ ASSERT_OK(table_creator->table_name(kTableName)
+ .schema(&schema)
+ .set_range_partition_columns({})
+ .num_replicas(1)
+ .Create());
+ ASSERT_OK(client_->OpenTable(kTableName, &table));
+ }
+
+ vector<KuduScanToken*> tokens;
+ ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
+ ASSERT_EQ(1, tokens.size());
+ unique_ptr<KuduScanToken> token(tokens[0]);
+
+ // Rename the table.
+ {
+ unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+ table_alterer->RenameTo("scan-token-rename-renamed");
+ ASSERT_OK(table_alterer->Alter());
+ }
+
+ KuduScanner* scanner_ptr;
+ ASSERT_OK(token->IntoKuduScanner(&scanner_ptr));
+ size_t row_count;
+ ASSERT_OK(CountRowsWithRetries(scanner_ptr, &row_count));
+ ASSERT_EQ(0, row_count);
+ delete scanner_ptr;
+}
+
+
} // namespace client
} // namespace kudu