You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/06/09 17:08:30 UTC

[kudu] branch master updated: [master] KUDU-2671: Range specific hashing during table alter op.

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 250eb90bc [master] KUDU-2671: Range specific hashing during table alter op.
250eb90bc is described below

commit 250eb90bc0e1f4f472f44de8a23ce213595d5ee7
Author: Abhishek Chennaka <ac...@cloudera.com>
AuthorDate: Tue May 10 10:09:31 2022 -0400

    [master] KUDU-2671: Range specific hashing during table alter op.
    
    This commit has the changes needed on the master side to
    support this functionality. A basic test is added to test
    the functionality as well.
    
    Change-Id: Iea9e3317d172c9ae76662c44b21fca9a4819930a
    Reviewed-on: http://gerrit.cloudera.org:8080/18515
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Alexey Serbin <al...@apache.org>
---
 src/kudu/master/catalog_manager.cc |  16 ++++-
 src/kudu/master/master-test.cc     | 128 ++++++++++++++++++++++++++++++++++---
 2 files changed, 134 insertions(+), 10 deletions(-)

diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 2928466f4..ab3380f23 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2618,7 +2618,6 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
   PartitionSchema partition_schema;
   RETURN_NOT_OK(PartitionSchema::FromPB(
       l.data().pb.partition_schema(), schema, &partition_schema));
-
   TableInfo::TabletInfoMap existing_tablets = table->tablet_map();
   TableInfo::TabletInfoMap new_tablets;
   auto abort_mutations = MakeScopedCleanup([&new_tablets]() {
@@ -2627,9 +2626,22 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
     }
   });
 
+  vector<PartitionSchema::HashSchema> range_hash_schemas;
   for (const auto& step : steps) {
     vector<DecodedRowOperation> ops;
     if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION) {
+      if (FLAGS_enable_per_range_hash_schemas &&
+          step.add_range_partition().custom_hash_schema_size() > 0) {
+        const Schema schema = client_schema.CopyWithColumnIds();
+        PartitionSchema::HashSchema hash_schema;
+        RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB(
+            schema, step.add_range_partition().custom_hash_schema(), &hash_schema));
+        if (partition_schema.hash_schema().size() != hash_schema.size()) {
+          return Status::NotSupported(
+              "varying number of hash dimensions per range is not yet supported");
+        }
+        range_hash_schemas.emplace_back(std::move(hash_schema));
+      }
       RowOperationsPBDecoder decoder(&step.add_range_partition().range_bounds(),
                                      &client_schema, &schema, nullptr);
       RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));
@@ -2666,7 +2678,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
 
     vector<Partition> partitions;
     RETURN_NOT_OK(partition_schema.CreatePartitions(
-        {}, {{ *ops[0].split_row, *ops[1].split_row }}, {}, schema, &partitions));
+        {}, {{ *ops[0].split_row, *ops[1].split_row }}, range_hash_schemas, schema, &partitions));
     switch (step.type()) {
       case AlterTableRequestPB::ADD_RANGE_PARTITION: {
         for (const Partition& partition : partitions) {
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index db345476c..5e13ed68c 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -197,6 +197,7 @@ class MasterTest : public KuduTest {
                      const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds = {},
                      const vector<HashSchema>& range_hash_schemas = {});
 
+
   Status CreateTable(const string& name,
                      const Schema& schema,
                      const optional<TableTypePB>& type,
@@ -204,7 +205,9 @@ class MasterTest : public KuduTest {
                      const optional<string>& comment,
                      const vector<KuduPartialRow>& split_rows,
                      const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds,
-                     const vector<HashSchema>& range_hash_schemas);
+                     const vector<HashSchema>& range_hash_schemas,
+                     const HashSchema& table_wide_hash_schema,
+                     CreateTableResponsePB* resp);
 
   shared_ptr<Messenger> client_messenger_;
   unique_ptr<MiniMaster> mini_master_;
@@ -223,8 +226,9 @@ Status MasterTest::CreateTable(const string& name,
   KuduPartialRow split2(&schema);
   RETURN_NOT_OK(split2.SetInt32("key", 20));
 
+  CreateTableResponsePB resp;
   return CreateTable(
-      name, schema, type, owner, comment, { split1, split2 }, {}, {});
+      name, schema, type, owner, comment, { split1, split2 }, {}, {}, {}, &resp);
 }
 
 Status MasterTest::CreateTable(
@@ -233,8 +237,9 @@ Status MasterTest::CreateTable(
     const vector<KuduPartialRow>& split_rows,
     const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds,
     const vector<HashSchema>& range_hash_schemas) {
+  CreateTableResponsePB resp;
   return CreateTable(
-        name, schema, none, none, none, split_rows, bounds, range_hash_schemas);
+        name, schema, none, none, none, split_rows, bounds, range_hash_schemas, {}, &resp);
 }
 
 Status MasterTest::CreateTable(
@@ -245,7 +250,9 @@ Status MasterTest::CreateTable(
     const optional<string>& comment,
     const vector<KuduPartialRow>& split_rows,
     const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds,
-    const vector<HashSchema>& range_hash_schemas) {
+    const vector<HashSchema>& range_hash_schemas,
+    const HashSchema& table_wide_hash_schema,
+    CreateTableResponsePB* resp) {
 
   if (!range_hash_schemas.empty() && range_hash_schemas.size() != bounds.size()) {
     return Status::InvalidArgument(
@@ -268,6 +275,16 @@ Status MasterTest::CreateTable(
   }
 
   auto* ps_pb = req.mutable_partition_schema();
+
+  for (const auto& hash_dimension : table_wide_hash_schema) {
+    auto* hash_schema = ps_pb->add_hash_schema();
+    for (const string& col_name : hash_dimension.columns) {
+      hash_schema->add_columns()->set_name(col_name);
+    }
+    hash_schema->set_num_buckets(hash_dimension.num_buckets);
+    hash_schema->set_seed(hash_dimension.num_buckets);
+  }
+
   for (size_t i = 0; i < range_hash_schemas.size(); ++i) {
     auto* range = ps_pb->add_custom_hash_schema_ranges();
     RowOperationsPBEncoder encoder(range->mutable_range_bounds());
@@ -296,10 +313,9 @@ Status MasterTest::CreateTable(
     controller.RequireServerFeature(MasterFeatures::RANGE_PARTITION_BOUNDS);
   }
 
-  CreateTableResponsePB resp;
-  RETURN_NOT_OK(proxy_->CreateTable(req, &resp, &controller));
-  if (resp.has_error()) {
-    RETURN_NOT_OK(StatusFromPB(resp.error().status()));
+  RETURN_NOT_OK(proxy_->CreateTable(req, resp, &controller));
+  if (resp->has_error()) {
+    RETURN_NOT_OK(StatusFromPB(resp->error().status()));
   }
   return Status::OK();
 }
@@ -935,6 +951,102 @@ TEST_F(MasterTest, ListTablesWithTableFilter) {
   }
 }
 
+class AlterTableWithRangeSpecificHashSchema : public MasterTest,
+                             public ::testing::WithParamInterface<bool> {};
+
+TEST_P(AlterTableWithRangeSpecificHashSchema, TestAlterTableWithDifferentHashDimensions) {
+  constexpr const char* const kTableName = "testtb";
+  const Schema kTableSchema({ColumnSchema("key", INT32),
+                             ColumnSchema("val", INT32)}, 2);
+  FLAGS_enable_per_range_hash_schemas = true; // enable for testing.
+  FLAGS_default_num_replicas = 1;
+
+  // Create a table with one partition
+  KuduPartialRow a_lower(&kTableSchema);
+  KuduPartialRow a_upper(&kTableSchema);
+  ASSERT_OK(a_lower.SetInt32("key", 0));
+  ASSERT_OK(a_upper.SetInt32("key", 100));
+  CreateTableResponsePB create_table_resp;
+  ASSERT_OK(CreateTable(
+      kTableName, kTableSchema, none, none, none, {}, {{a_lower, a_upper}},
+      {}, {{{"key"}, 2, 0}}, &create_table_resp));
+
+  // Populate the custom hash schemas with different hash dimension count based on
+  // the test case
+  HashSchema custom_range_hash_schema;
+  const bool has_different_dimensions_count = GetParam();
+  if (has_different_dimensions_count) {
+    custom_range_hash_schema = {{{"key"}, 3, 0},
+                                {{"val"}, 3, 0}};
+  } else {
+    custom_range_hash_schema = {{{"key"}, 3, 0}};
+  }
+
+  //Create AlterTableRequestPB and populate it for the alter table operation
+  AlterTableRequestPB req;
+  AlterTableResponsePB resp;
+  RpcController controller;
+  req.mutable_table()->set_table_name(kTableName);
+  req.mutable_table()->set_table_id(create_table_resp.table_id());
+
+  AlterTableRequestPB::Step* step = req.add_alter_schema_steps();
+  step->set_type(AlterTableRequestPB::ADD_RANGE_PARTITION);
+  KuduPartialRow lower(&kTableSchema);
+  ASSERT_OK(lower.SetInt32("key", 200));
+  KuduPartialRow upper(&kTableSchema);
+  ASSERT_OK(upper.SetInt32("key", 300));
+  RowOperationsPBEncoder splits_encoder(
+      step->mutable_add_range_partition()->mutable_range_bounds());
+  splits_encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+  splits_encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+  for (const auto& hash_dimension: custom_range_hash_schema) {
+    auto* hash_dimension_pb = step->mutable_add_range_partition()->add_custom_hash_schema();
+    for (const string& col_name: hash_dimension.columns) {
+      hash_dimension_pb->add_columns()->set_name(col_name);
+    }
+    hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
+    hash_dimension_pb->set_seed(hash_dimension.seed);
+  }
+
+  ColumnSchemaPB* col1 = req.mutable_schema()->add_columns();
+  col1->set_name("key");
+  col1->set_type(INT32);
+  col1->set_is_key(true);
+
+  ColumnSchemaPB* col2 = req.mutable_schema()->add_columns();
+  col2->set_name("val");
+  col2->set_type(INT32);
+  col2->set_is_key(true);
+
+  // Check the number of tablets in the table
+  std::vector<scoped_refptr<TableInfo>> tables;
+  {
+    CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+    master_->catalog_manager()->GetAllTables(&tables);
+  }
+  ASSERT_EQ(1, tables.size());
+  ASSERT_EQ(2, tables.front()->num_tablets());
+
+  // Submit the alter table request
+  proxy_->AlterTable(req, &resp, &controller);
+  if (has_different_dimensions_count) {
+    ASSERT_TRUE(resp.has_error());
+    ASSERT_STR_CONTAINS(resp.error().status().DebugString(),
+                        "varying number of hash dimensions per range is not yet supported");
+  } else {
+    ASSERT_FALSE(resp.has_error());
+    {
+      CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+      master_->catalog_manager()->GetAllTables(&tables);
+    }
+    ASSERT_EQ(1, tables.size());
+    ASSERT_EQ(5, tables.front()->num_tablets());
+  }
+}
+
+INSTANTIATE_TEST_SUITE_P(AlterTableWithCustomHashSchema,
+                         AlterTableWithRangeSpecificHashSchema, ::testing::Bool());
+
 TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
   constexpr const char* const kTableName = "testtb";
   const Schema kTableSchema({ ColumnSchema("key", INT32), ColumnSchema("val", INT32) }, 1);