You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/08/04 19:43:30 UTC
[kudu] branch master updated: KUDU-2671 range-specific hash schemas for 'kudu table add_range_partition'
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new d92f0f8eb KUDU-2671 range-specific hash schemas for 'kudu table add_range_partition'
d92f0f8eb is described below
commit d92f0f8ebdf922e63d7bccf1f1da9de614d631b4
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Aug 3 19:03:29 2022 -0700
KUDU-2671 range-specific hash schemas for 'kudu table add_range_partition'
This patch adds support for range-specific hash schemas into the
'kudu table add_range_partition' CLI tool. This patch also contains
a test scenario to cover the newly introduced functionality.
An example of usage:
kudu table add_range_partition $KUDU_MASTER my_table [0] [1] \
--hash_schema='{"hash_schema": [{"columns": ["c"], "num_buckets": 5}]}'
Change-Id: I3832312b6ebfb397bb3083931f6d53039afc5e9b
Reviewed-on: http://gerrit.cloudera.org:8080/18814
Reviewed-by: Abhishek Chennaka <ac...@cloudera.com>
Reviewed-by: Mahesh Reddy <mr...@cloudera.com>
Tested-by: Alexey Serbin <al...@apache.org>
Reviewed-by: Attila Bukor <ab...@apache.org>
---
src/kudu/tools/kudu-admin-test.cc | 159 +++++++++++++++++++++++++++++++++++-
src/kudu/tools/tool.proto | 5 ++
src/kudu/tools/tool_action_table.cc | 55 +++++++++++--
3 files changed, 210 insertions(+), 9 deletions(-)
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index 4d9706b4c..1480ce6f3 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -25,7 +25,6 @@
#include <memory>
#include <ostream>
#include <string>
-#include <thread>
#include <tuple>
#include <unordered_map>
#include <unordered_set>
@@ -142,7 +141,7 @@ using std::deque;
using std::endl;
using std::ostringstream;
using std::string;
-using std::thread;
+using std::pair;
using std::tuple;
using std::unique_ptr;
using std::vector;
@@ -3214,6 +3213,162 @@ TEST_F(AdminCliTest, TestAddAndDropRangePartitionForMultipleRangeColumnsTable) {
});
}
+TEST_F(AdminCliTest, AddAndDropRangeWithCustomHashSchema) {
+ FLAGS_num_tablet_servers = 1;
+ FLAGS_num_replicas = 1;
+
+ NO_FATALS(BuildAndStart());
+
+ const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+ constexpr const char* const kTestTableName = "custom_hash_schemas";
+ constexpr const char* const kC0 = "c0";
+ constexpr const char* const kC1 = "c1";
+ constexpr const char* const kC2 = "c2";
+
+ {
+ KuduSchemaBuilder builder;
+ builder.AddColumn(kC0)->Type(KuduColumnSchema::INT8)->NotNull();
+ builder.AddColumn(kC1)->Type(KuduColumnSchema::INT16)->NotNull();
+ builder.AddColumn(kC2)->Type(KuduColumnSchema::STRING);
+ builder.SetPrimaryKey({ kC0, kC1 });
+ KuduSchema schema;
+ ASSERT_OK(builder.Build(&schema));
+
+ // Create a table with left-unbounded range partition having the
+ // table-wide hash schema.
+ unique_ptr<KuduPartialRow> l(schema.NewRow());
+ unique_ptr<KuduPartialRow> u(schema.NewRow());
+ ASSERT_OK(u->SetInt8(kC0, 0));
+
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ ASSERT_OK(table_creator->table_name(kTestTableName)
+ .schema(&schema)
+ .set_range_partition_columns({ kC0 })
+ .add_hash_partitions({ kC1 }, 2)
+ .add_range_partition(l.release(), u.release())
+ .num_replicas(FLAGS_num_replicas)
+ .Create());
+ }
+
+ string stdout;
+ string stderr;
+
+ // Add a range partition with custom hash schema using the kudu CLI tool.
+ {
+ constexpr const char* const kHashSchemaJson = R"*({
+ "hash_schema": [
+ { "columns": ["c1"], "num_buckets": 5, "seed": 8 }
+ ]
+ })*";
+ const auto s = RunKuduTool({
+ "table",
+ "add_range_partition",
+ master_addr,
+ kTestTableName,
+ "[0]",
+ "[1]",
+ Substitute("--hash_schema=$0", kHashSchemaJson),
+ }, &stdout, &stderr);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+ }
+ {
+ const auto s = RunKuduTool({
+ "table",
+ "describe",
+ master_addr,
+ kTestTableName,
+ }, &stdout, &stderr);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+ ASSERT_STR_CONTAINS(stdout,
+ "PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5");
+ }
+
+ // Insert a row into the newly added range partition.
+ {
+ client::sp::shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+
+ auto session = client_->NewSession();
+ unique_ptr<KuduInsert> insert(table->NewInsert());
+ auto* row = insert->mutable_row();
+ ASSERT_OK(row->SetInt8(kC0, 0));
+ ASSERT_OK(row->SetInt16(kC1, 0));
+ ASSERT_OK(row->SetString(kC2, "0"));
+ ASSERT_OK(session->Apply(insert.release()));
+ ASSERT_OK(session->Flush());
+ ASSERT_EQ(1, CountTableRows(table.get()));
+ }
+
+ // Add unbounded range using the kudu CLI tool.
+ {
+ constexpr const char* const kHashSchemaJson = R"*({
+ "hash_schema": [ { "columns": ["c0", "c1"], "num_buckets": 3 } ] })*";
+ const auto s = RunKuduTool({
+ "table",
+ "add_range_partition",
+ master_addr,
+ kTestTableName,
+ "[1]",
+ "[]",
+ Substitute("--hash_schema=$0", kHashSchemaJson),
+ }, &stdout, &stderr);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+ }
+ {
+ const auto s = RunKuduTool({
+ "table",
+ "describe",
+ master_addr,
+ kTestTableName,
+ }, &stdout, &stderr);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+ ASSERT_STR_CONTAINS(stdout,
+ "PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5");
+ ASSERT_STR_CONTAINS(stdout,
+ "PARTITION 1 <= VALUES HASH(c0, c1) PARTITIONS 3");
+ }
+
+ // Insert a row into the newly added range partition.
+ {
+ client::sp::shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+ auto session = client_->NewSession();
+ unique_ptr<KuduInsert> insert(table->NewInsert());
+ auto* row = insert->mutable_row();
+ ASSERT_OK(row->SetInt8(kC0, 10));
+ ASSERT_OK(row->SetInt16(kC1, 10));
+ ASSERT_OK(row->SetString(kC2, "10"));
+ ASSERT_OK(session->Apply(insert.release()));
+ ASSERT_OK(session->Flush());
+ ASSERT_EQ(2, CountTableRows(table.get()));
+ }
+
+ // Drop all the ranges one-by-one.
+ const vector<pair<string, string>> kRangesStr = {
+ {"", "0"}, {"0", "1"}, {"1", ""},
+ };
+
+ for (const std::pair<string, string>& r : kRangesStr) {
+ SCOPED_TRACE(Substitute("range ['$0', '$1')", r.first, r.second));
+ const auto s = RunKuduTool({
+ "table",
+ "drop_range_partition",
+ master_addr,
+ kTestTableName,
+ Substitute("[$0]", r.first),
+ Substitute("[$0]", r.second),
+ }, &stdout, &stderr);
+ ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+ }
+
+ // There should be 0 rows left.
+ client::sp::shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_EQ(0, CountTableRows(table.get()));
+ });
+}
+
namespace {
constexpr const char* kPrincipal = "oryx";
diff --git a/src/kudu/tools/tool.proto b/src/kudu/tools/tool.proto
index f98d8cd84..f80e462ba 100644
--- a/src/kudu/tools/tool.proto
+++ b/src/kudu/tools/tool.proto
@@ -454,6 +454,11 @@ message PartitionPB {
repeated RangeWithHashSchemaPB custom_hash_schema_ranges = 4;
}
+ // A standalone message representing a hash schema.
+ message HashSchemaPB {
+ repeated HashPartitionPB hash_schema = 1;
+ }
+
// Table-wide hash schema.
repeated HashPartitionPB hash_partitions = 1;
// Range partitioning information.
diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc
index b764781de..d43c2b299 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -84,9 +84,11 @@ using kudu::client::KuduTableCreator;
using kudu::client::KuduTableStatistics;
using kudu::client::KuduValue;
using kudu::client::internal::ReplicaController;
+using kudu::tools::PartitionPB;
using std::cerr;
using std::cout;
using std::endl;
+using std::make_unique;
using std::map;
using std::pair;
using std::set;
@@ -150,6 +152,10 @@ DEFINE_string(lower_bound_type, "INCLUSIVE_BOUND",
DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND",
"The type of the upper bound, either inclusive or exclusive. "
"Defaults to exclusive. This flag is case-insensitive.");
+DEFINE_string(hash_schema, "",
+ "String representation of range-specific hash schema as a JSON "
+ "object, e.g. "
+ "{\"hash_schema\": [{\"columns\": [\"c0\"], \"num_buckets\": 5}]}");
DEFINE_int32(scan_batch_size, -1,
"The size for scan results batches, in bytes. A negative value "
"means the server-side default is used, where the server-side "
@@ -1000,18 +1006,52 @@ Status ModifyRangePartition(const RunnerContext& context, PartitionAction action
upper_bound.get()));
KuduTableCreator::RangePartitionBound lower_bound_type;
- KuduTableCreator::RangePartitionBound upper_bound_type;
+ RETURN_NOT_OK(convert_bounds_type(
+ "lower bound", FLAGS_lower_bound_type, &lower_bound_type));
- RETURN_NOT_OK(convert_bounds_type("lower bound", FLAGS_lower_bound_type, &lower_bound_type));
- RETURN_NOT_OK(convert_bounds_type("upper bound", FLAGS_upper_bound_type, &upper_bound_type));
+ KuduTableCreator::RangePartitionBound upper_bound_type;
+ RETURN_NOT_OK(convert_bounds_type(
+ "upper bound", FLAGS_upper_bound_type, &upper_bound_type));
+
+ const auto& hash_schema_str = FLAGS_hash_schema;
+ PartitionPB::HashSchemaPB hash_schema;
+ if (!hash_schema_str.empty()) {
+ JsonParseOptions opts;
+ opts.case_insensitive_enum_parsing = true;
+ if (const auto& s = JsonStringToMessage(
+ hash_schema_str, &hash_schema, opts); !s.ok()) {
+ return Status::InvalidArgument(
+ Substitute("unable to parse JSON: $0", hash_schema_str),
+ s.error_message().ToString());
+ }
+ }
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
if (action == PartitionAction::ADD) {
- return alterer->AddRangePartition(lower_bound.release(),
- upper_bound.release(),
- lower_bound_type,
- upper_bound_type)->Alter();
+ if (hash_schema_str.empty()) {
+ // Add range partition with table-wide hash schema.
+ return alterer->AddRangePartition(lower_bound.release(),
+ upper_bound.release(),
+ lower_bound_type,
+ upper_bound_type)->Alter();
+ }
+
+ // Add range partition with custom hash schema.
+ auto p = make_unique<KuduRangePartition>(lower_bound.release(),
+ upper_bound.release(),
+ lower_bound_type,
+ upper_bound_type);
+ for (const auto& dimension_pb : hash_schema.hash_schema()) {
+ vector<string> columns;
+ for (const auto& column : dimension_pb.columns()) {
+ columns.emplace_back(column);
+ }
+ p->add_hash_partitions(
+ columns, dimension_pb.num_buckets(), dimension_pb.seed());
+ }
+ return alterer->AddRangePartition(p.release())->Alter();
}
+
DCHECK_EQ(PartitionAction::DROP, action);
return alterer->DropRangePartition(lower_bound.release(),
upper_bound.release(),
@@ -1790,6 +1830,7 @@ unique_ptr<Mode> BuildTableMode() {
"the upper range partition will be unbounded" })
.AddOptionalParameter("lower_bound_type")
.AddOptionalParameter("upper_bound_type")
+ .AddOptionalParameter("hash_schema")
.Build();
unique_ptr<Action> column_set_default =