You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/08/04 19:43:30 UTC

[kudu] branch master updated: KUDU-2671 range-specific hash schemas for 'kudu table add_range_partition'

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new d92f0f8eb KUDU-2671 range-specific hash schemas for 'kudu table add_range_partition'
d92f0f8eb is described below

commit d92f0f8ebdf922e63d7bccf1f1da9de614d631b4
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Aug 3 19:03:29 2022 -0700

    KUDU-2671 range-specific hash schemas for 'kudu table add_range_partition'
    
    This patch adds support for range-specific hash schemas into the
    'kudu table add_range_partition' CLI tool.  This patch also contains
    a test scenario to cover the newly introduced functionality.
    
    An example of usage:
    
      kudu table add_range_partition $KUDU_MASTER my_table [0] [1] \
        --hash_schema='{"hash_schema": [{"columns": ["c"], "num_buckets": 5}]}'
    
    Change-Id: I3832312b6ebfb397bb3083931f6d53039afc5e9b
    Reviewed-on: http://gerrit.cloudera.org:8080/18814
    Reviewed-by: Abhishek Chennaka <ac...@cloudera.com>
    Reviewed-by: Mahesh Reddy <mr...@cloudera.com>
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Attila Bukor <ab...@apache.org>
---
 src/kudu/tools/kudu-admin-test.cc   | 159 +++++++++++++++++++++++++++++++++++-
 src/kudu/tools/tool.proto           |   5 ++
 src/kudu/tools/tool_action_table.cc |  55 +++++++++++--
 3 files changed, 210 insertions(+), 9 deletions(-)

diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index 4d9706b4c..1480ce6f3 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -25,7 +25,6 @@
 #include <memory>
 #include <ostream>
 #include <string>
-#include <thread>
 #include <tuple>
 #include <unordered_map>
 #include <unordered_set>
@@ -142,7 +141,7 @@ using std::deque;
 using std::endl;
 using std::ostringstream;
 using std::string;
-using std::thread;
+using std::pair;
 using std::tuple;
 using std::unique_ptr;
 using std::vector;
@@ -3214,6 +3213,162 @@ TEST_F(AdminCliTest, TestAddAndDropRangePartitionForMultipleRangeColumnsTable) {
   });
 }
 
+TEST_F(AdminCliTest, AddAndDropRangeWithCustomHashSchema) {
+  FLAGS_num_tablet_servers = 1;
+  FLAGS_num_replicas = 1;
+
+  NO_FATALS(BuildAndStart());
+
+  const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+  constexpr const char* const kTestTableName = "custom_hash_schemas";
+  constexpr const char* const kC0 = "c0";
+  constexpr const char* const kC1 = "c1";
+  constexpr const char* const kC2 = "c2";
+
+  {
+    KuduSchemaBuilder builder;
+    builder.AddColumn(kC0)->Type(KuduColumnSchema::INT8)->NotNull();
+    builder.AddColumn(kC1)->Type(KuduColumnSchema::INT16)->NotNull();
+    builder.AddColumn(kC2)->Type(KuduColumnSchema::STRING);
+    builder.SetPrimaryKey({ kC0, kC1 });
+    KuduSchema schema;
+    ASSERT_OK(builder.Build(&schema));
+
+    // Create a table with left-unbounded range partition having the
+    // table-wide hash schema.
+    unique_ptr<KuduPartialRow> l(schema.NewRow());
+    unique_ptr<KuduPartialRow> u(schema.NewRow());
+    ASSERT_OK(u->SetInt8(kC0, 0));
+
+    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    ASSERT_OK(table_creator->table_name(kTestTableName)
+        .schema(&schema)
+        .set_range_partition_columns({ kC0 })
+        .add_hash_partitions({ kC1 }, 2)
+        .add_range_partition(l.release(), u.release())
+        .num_replicas(FLAGS_num_replicas)
+        .Create());
+  }
+
+  string stdout;
+  string stderr;
+
+  // Add a range partition with custom hash schema using the kudu CLI tool.
+  {
+    constexpr const char* const kHashSchemaJson = R"*({
+      "hash_schema": [
+        { "columns": ["c1"], "num_buckets": 5, "seed": 8 }
+      ]
+    })*";
+    const auto s = RunKuduTool({
+      "table",
+      "add_range_partition",
+      master_addr,
+      kTestTableName,
+      "[0]",
+      "[1]",
+      Substitute("--hash_schema=$0", kHashSchemaJson),
+    }, &stdout, &stderr);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+  }
+  {
+    const auto s = RunKuduTool({
+      "table",
+      "describe",
+      master_addr,
+      kTestTableName,
+    }, &stdout, &stderr);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+    ASSERT_STR_CONTAINS(stdout,
+                        "PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5");
+  }
+
+  // Insert a row into the newly added range partition.
+  {
+    client::sp::shared_ptr<KuduTable> table;
+    ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+
+    auto session = client_->NewSession();
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    auto* row = insert->mutable_row();
+    ASSERT_OK(row->SetInt8(kC0, 0));
+    ASSERT_OK(row->SetInt16(kC1, 0));
+    ASSERT_OK(row->SetString(kC2, "0"));
+    ASSERT_OK(session->Apply(insert.release()));
+    ASSERT_OK(session->Flush());
+    ASSERT_EQ(1, CountTableRows(table.get()));
+  }
+
+  // Add unbounded range using the kudu CLI tool.
+  {
+    constexpr const char* const kHashSchemaJson = R"*({
+      "hash_schema": [ { "columns": ["c0", "c1"], "num_buckets": 3 } ] })*";
+    const auto s = RunKuduTool({
+      "table",
+      "add_range_partition",
+      master_addr,
+      kTestTableName,
+      "[1]",
+      "[]",
+      Substitute("--hash_schema=$0", kHashSchemaJson),
+    }, &stdout, &stderr);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+  }
+  {
+    const auto s = RunKuduTool({
+      "table",
+      "describe",
+      master_addr,
+      kTestTableName,
+    }, &stdout, &stderr);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+    ASSERT_STR_CONTAINS(stdout,
+                        "PARTITION 0 <= VALUES < 1 HASH(c1) PARTITIONS 5");
+    ASSERT_STR_CONTAINS(stdout,
+                        "PARTITION 1 <= VALUES HASH(c0, c1) PARTITIONS 3");
+  }
+
+  // Insert a row into the newly added range partition.
+  {
+    client::sp::shared_ptr<KuduTable> table;
+    ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+    auto session = client_->NewSession();
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    auto* row = insert->mutable_row();
+    ASSERT_OK(row->SetInt8(kC0, 10));
+    ASSERT_OK(row->SetInt16(kC1, 10));
+    ASSERT_OK(row->SetString(kC2, "10"));
+    ASSERT_OK(session->Apply(insert.release()));
+    ASSERT_OK(session->Flush());
+    ASSERT_EQ(2, CountTableRows(table.get()));
+  }
+
+  // Drop all the ranges one-by-one.
+  const vector<pair<string, string>> kRangesStr = {
+    {"", "0"}, {"0", "1"}, {"1", ""},
+  };
+
+  for (const std::pair<string, string>& r : kRangesStr) {
+    SCOPED_TRACE(Substitute("range ['$0', '$1')", r.first, r.second));
+    const auto s = RunKuduTool({
+      "table",
+      "drop_range_partition",
+      master_addr,
+      kTestTableName,
+      Substitute("[$0]", r.first),
+      Substitute("[$0]", r.second),
+    }, &stdout, &stderr);
+    ASSERT_TRUE(s.ok()) << ToolRunInfo(s, stdout, stderr);
+  }
+
+  // There should be 0 rows left.
+  client::sp::shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(kTestTableName, &table));
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_EQ(0, CountTableRows(table.get()));
+  });
+}
+
 namespace {
 constexpr const char* kPrincipal = "oryx";
 
diff --git a/src/kudu/tools/tool.proto b/src/kudu/tools/tool.proto
index f98d8cd84..f80e462ba 100644
--- a/src/kudu/tools/tool.proto
+++ b/src/kudu/tools/tool.proto
@@ -454,6 +454,11 @@ message PartitionPB {
     repeated RangeWithHashSchemaPB custom_hash_schema_ranges = 4;
   }
 
+  // A standalone message representing a hash schema.
+  message HashSchemaPB {
+    repeated HashPartitionPB hash_schema = 1;
+  }
+
   // Table-wide hash schema.
   repeated HashPartitionPB hash_partitions = 1;
   // Range partitioning information.
diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc
index b764781de..d43c2b299 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -84,9 +84,11 @@ using kudu::client::KuduTableCreator;
 using kudu::client::KuduTableStatistics;
 using kudu::client::KuduValue;
 using kudu::client::internal::ReplicaController;
+using kudu::tools::PartitionPB;
 using std::cerr;
 using std::cout;
 using std::endl;
+using std::make_unique;
 using std::map;
 using std::pair;
 using std::set;
@@ -150,6 +152,10 @@ DEFINE_string(lower_bound_type, "INCLUSIVE_BOUND",
 DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND",
               "The type of the upper bound, either inclusive or exclusive. "
               "Defaults to exclusive. This flag is case-insensitive.");
+DEFINE_string(hash_schema, "",
+              "String representation of range-specific hash schema as a JSON "
+              "object, e.g. "
+              "{\"hash_schema\": [{\"columns\": [\"c0\"], \"num_buckets\": 5}]}");
 DEFINE_int32(scan_batch_size, -1,
              "The size for scan results batches, in bytes. A negative value "
              "means the server-side default is used, where the server-side "
@@ -1000,18 +1006,52 @@ Status ModifyRangePartition(const RunnerContext& context, PartitionAction action
                                         upper_bound.get()));
 
   KuduTableCreator::RangePartitionBound lower_bound_type;
-  KuduTableCreator::RangePartitionBound upper_bound_type;
+  RETURN_NOT_OK(convert_bounds_type(
+      "lower bound", FLAGS_lower_bound_type, &lower_bound_type));
 
-  RETURN_NOT_OK(convert_bounds_type("lower bound", FLAGS_lower_bound_type, &lower_bound_type));
-  RETURN_NOT_OK(convert_bounds_type("upper bound", FLAGS_upper_bound_type, &upper_bound_type));
+  KuduTableCreator::RangePartitionBound upper_bound_type;
+  RETURN_NOT_OK(convert_bounds_type(
+      "upper bound", FLAGS_upper_bound_type, &upper_bound_type));
+
+  const auto& hash_schema_str = FLAGS_hash_schema;
+  PartitionPB::HashSchemaPB hash_schema;
+  if (!hash_schema_str.empty()) {
+    JsonParseOptions opts;
+    opts.case_insensitive_enum_parsing = true;
+    if (const auto& s = JsonStringToMessage(
+          hash_schema_str, &hash_schema, opts); !s.ok()) {
+      return Status::InvalidArgument(
+          Substitute("unable to parse JSON: $0", hash_schema_str),
+                     s.error_message().ToString());
+    }
+  }
 
   unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
   if (action == PartitionAction::ADD) {
-    return alterer->AddRangePartition(lower_bound.release(),
-                                      upper_bound.release(),
-                                      lower_bound_type,
-                                      upper_bound_type)->Alter();
+    if (hash_schema_str.empty()) {
+      // Add range partition with table-wide hash schema.
+      return alterer->AddRangePartition(lower_bound.release(),
+                                        upper_bound.release(),
+                                        lower_bound_type,
+                                        upper_bound_type)->Alter();
+    }
+
+    // Add range partition with custom hash schema.
+    auto p = make_unique<KuduRangePartition>(lower_bound.release(),
+                                             upper_bound.release(),
+                                             lower_bound_type,
+                                             upper_bound_type);
+    for (const auto& dimension_pb : hash_schema.hash_schema()) {
+      vector<string> columns;
+      for (const auto& column : dimension_pb.columns()) {
+        columns.emplace_back(column);
+      }
+      p->add_hash_partitions(
+          columns, dimension_pb.num_buckets(), dimension_pb.seed());
+    }
+    return alterer->AddRangePartition(p.release())->Alter();
   }
+
   DCHECK_EQ(PartitionAction::DROP, action);
   return alterer->DropRangePartition(lower_bound.release(),
                                      upper_bound.release(),
@@ -1790,6 +1830,7 @@ unique_ptr<Mode> BuildTableMode() {
                               "the upper range partition will be unbounded" })
       .AddOptionalParameter("lower_bound_type")
       .AddOptionalParameter("upper_bound_type")
+      .AddOptionalParameter("hash_schema")
       .Build();
 
   unique_ptr<Action> column_set_default =