You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/06/16 15:22:21 UTC

[kudu] 02/02: KUDU-2671 update partition schema in catalog when adding range

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 6909ee4f800da192b72e59680916e5004527b6db
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Jun 13 15:10:13 2022 -0700

    KUDU-2671 update partition schema in catalog when adding range
    
    When adding a range with custom hash schema to a table, it's necessary
    to update the partition schema information stored in the system catalog
    correspondingly.  That was missing in one of the previous patches and
    this patch addresses the issue.
    
    This patch also adds a test scenario to spot regressions, if any.  The
    scenario was failing before the update in CatalogManager introduced
    in this patch.  I also addressed nits pointed to by the TidyBot.
    
    This is a follow-up to 250eb90bc0e1f4f472f44de8a23ce213595d5ee7.
    
    Change-Id: I869458fb8bcb06801b54f2b4869e7826322563e0
    Reviewed-on: http://gerrit.cloudera.org:8080/18615
    Tested-by: Kudu Jenkins
    Reviewed-by: Mahesh Reddy <mr...@cloudera.com>
    Reviewed-by: Attila Bukor <ab...@apache.org>
---
 src/kudu/master/catalog_manager.cc |  48 +++++++++----
 src/kudu/master/catalog_manager.h  |  24 ++++---
 src/kudu/master/master-test.cc     | 137 ++++++++++++++++++++++++++++++++++++-
 3 files changed, 181 insertions(+), 28 deletions(-)

diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 726499d56..d23d01e64 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2518,10 +2518,11 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req,
   return Status::OK();
 }
 
-Status CatalogManager::ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
-                                             vector<AlterTableRequestPB::Step> steps,
-                                             Schema* new_schema,
-                                             ColumnId* next_col_id) {
+Status CatalogManager::ApplyAlterSchemaSteps(
+    const SysTablesEntryPB& current_pb,
+    const vector<AlterTableRequestPB::Step>& steps,
+    Schema* new_schema,
+    ColumnId* next_col_id) {
   const SchemaPB& current_schema_pb = current_pb.schema();
   Schema cur_schema;
   RETURN_NOT_OK(SchemaFromPB(current_schema_pb, &cur_schema));
@@ -2601,20 +2602,20 @@ Status CatalogManager::ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
 }
 
 Status CatalogManager::ApplyAlterPartitioningSteps(
-    const TableMetadataLock& l,
     const scoped_refptr<TableInfo>& table,
     const Schema& client_schema,
-    vector<AlterTableRequestPB::Step> steps,
+    const vector<AlterTableRequestPB::Step>& steps,
+    TableMetadataLock* l,
     vector<scoped_refptr<TabletInfo>>* tablets_to_add,
     vector<scoped_refptr<TabletInfo>>* tablets_to_drop) {
 
   // Get the table's schema as it's known to the catalog manager.
   Schema schema;
-  RETURN_NOT_OK(SchemaFromPB(l.data().pb.schema(), &schema));
+  RETURN_NOT_OK(SchemaFromPB(l->data().pb.schema(), &schema));
   // Build current PartitionSchema for the table.
   PartitionSchema partition_schema;
   RETURN_NOT_OK(PartitionSchema::FromPB(
-      l.data().pb.partition_schema(), schema, &partition_schema));
+      l->data().pb.partition_schema(), schema, &partition_schema));
   TableInfo::TabletInfoMap existing_tablets = table->tablet_map();
   TableInfo::TabletInfoMap new_tablets;
   auto abort_mutations = MakeScopedCleanup([&new_tablets]() {
@@ -2627,11 +2628,11 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
   for (const auto& step : steps) {
     CHECK(step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION ||
           step.type() == AlterTableRequestPB::DROP_RANGE_PARTITION);
-    const auto& range_bouds =
+    const auto& range_bounds =
         step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION
         ? step.add_range_partition().range_bounds()
         : step.drop_range_partition().range_bounds();
-    RowOperationsPBDecoder decoder(&range_bouds, &client_schema, &schema, nullptr);
+    RowOperationsPBDecoder decoder(&range_bounds, &client_schema, &schema, nullptr);
     vector<DecodedRowOperation> ops;
     RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));
 
@@ -2675,6 +2676,23 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
       }
       RETURN_NOT_OK(partition_schema.CreatePartitionsForRange(
           range_bound, hash_schema, schema, &partitions));
+
+      // Add information on the new range with custom hash schema into the
+      // PartitionSchema for the table stored in the system catalog.
+      auto* p = l->mutable_data()->pb.mutable_partition_schema();
+      auto* range = p->add_custom_hash_schema_ranges();
+      RowOperationsPBEncoder encoder(range->mutable_range_bounds());
+      encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, *ops[0].split_row);
+      encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, *ops[1].split_row);
+      for (const auto& hash_dimension : hash_schema) {
+        auto* hash_dimension_pb = range->add_hash_schema();
+        hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
+        hash_dimension_pb->set_seed(hash_dimension.seed);
+        auto* columns = hash_dimension_pb->add_columns();
+        for (const auto& column_id : hash_dimension.column_ids) {
+          columns->set_id(column_id);
+        }
+      }
     } else {
       RETURN_NOT_OK(partition_schema.CreatePartitions(
           {}, { range_bound }, schema, &partitions));
@@ -3202,11 +3220,11 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
     TRACE("Apply alter partitioning");
     Schema client_schema;
     RETURN_NOT_OK(SetupError(SchemaFromPB(req.schema(), &client_schema),
-          resp, MasterErrorPB::UNKNOWN_ERROR));
-    RETURN_NOT_OK(SetupError(
-          ApplyAlterPartitioningSteps(l, table, client_schema, alter_partitioning_steps,
-            &tablets_to_add, &tablets_to_drop),
-          resp, MasterErrorPB::UNKNOWN_ERROR));
+        resp, MasterErrorPB::UNKNOWN_ERROR));
+    RETURN_NOT_OK(SetupError(ApplyAlterPartitioningSteps(
+        table, client_schema, alter_partitioning_steps, &l,
+        &tablets_to_add, &tablets_to_drop),
+                             resp, MasterErrorPB::UNKNOWN_ERROR));
   }
 
   // 8. Alter table's replication factor.
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index b23b5b727..f073f3347 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -1070,17 +1070,19 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // container must not be empty.
   Status DeleteTskEntries(const std::set<std::string>& entry_ids);
 
-  Status ApplyAlterSchemaSteps(const SysTablesEntryPB& current_pb,
-                               std::vector<AlterTableRequestPB::Step> steps,
-                               Schema* new_schema,
-                               ColumnId* next_col_id);
-
-  Status ApplyAlterPartitioningSteps(const TableMetadataLock& l,
-                                     const scoped_refptr<TableInfo>& table,
-                                     const Schema& client_schema,
-                                     std::vector<AlterTableRequestPB::Step> steps,
-                                     std::vector<scoped_refptr<TabletInfo>>* tablets_to_add,
-                                     std::vector<scoped_refptr<TabletInfo>>* tablets_to_drop);
+  Status ApplyAlterSchemaSteps(
+      const SysTablesEntryPB& current_pb,
+      const std::vector<AlterTableRequestPB::Step>& steps,
+      Schema* new_schema,
+      ColumnId* next_col_id);
+
+  Status ApplyAlterPartitioningSteps(
+      const scoped_refptr<TableInfo>& table,
+      const Schema& client_schema,
+      const std::vector<AlterTableRequestPB::Step>& steps,
+      TableMetadataLock* l,
+      std::vector<scoped_refptr<TabletInfo>>* tablets_to_add,
+      std::vector<scoped_refptr<TabletInfo>>* tablets_to_drop);
 
   // Task that takes care of the tablet assignments/creations.
   // Loops through the "not created" tablets and sends a CreateTablet() request.
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 98ab68973..7196f96bc 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -43,6 +43,7 @@
 
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
+#include "kudu/common/partition.h"
 #include "kudu/common/row_operations.h"
 #include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
@@ -982,7 +983,7 @@ TEST_P(AlterTableWithRangeSpecificHashSchema, TestAlterTableWithDifferentHashDim
   ASSERT_EQ(2, tables.front()->num_tablets());
 
   // Submit the alter table request
-  proxy_->AlterTable(req, &resp, &controller);
+  ASSERT_OK(proxy_->AlterTable(req, &resp, &controller));
   if (has_different_dimensions_count) {
     ASSERT_TRUE(resp.has_error());
     ASSERT_STR_CONTAINS(resp.error().status().DebugString(),
@@ -997,10 +998,142 @@ TEST_P(AlterTableWithRangeSpecificHashSchema, TestAlterTableWithDifferentHashDim
     ASSERT_EQ(5, tables.front()->num_tablets());
   }
 }
-
 INSTANTIATE_TEST_SUITE_P(AlterTableWithCustomHashSchema,
                          AlterTableWithRangeSpecificHashSchema, ::testing::Bool());
 
+TEST_F(MasterTest, AlterTableAddRangeWithSpecificHashSchema) {
+  constexpr const char* const kTableName = "alter_table_custom_hash_schema";
+  constexpr const char* const kCol0 = "c_int32";
+  constexpr const char* const kCol1 = "c_int64";
+  const Schema kTableSchema({ColumnSchema(kCol0, INT32),
+                             ColumnSchema(kCol1, INT64)}, 1);
+  FLAGS_enable_per_range_hash_schemas = true;
+  FLAGS_default_num_replicas = 1;
+
+  // Create a table with one range partition based in the table-wide hash schema.
+  CreateTableResponsePB create_table_resp;
+  {
+    KuduPartialRow lower(&kTableSchema);
+    ASSERT_OK(lower.SetInt32(kCol0, 0));
+    KuduPartialRow upper(&kTableSchema);
+    ASSERT_OK(upper.SetInt32(kCol0, 100));
+    ASSERT_OK(CreateTable(
+        kTableName, kTableSchema, none, none, none, {}, {{lower, upper}},
+        {}, {{{kCol0}, 2, 0}}, &create_table_resp));
+  }
+
+  const auto& table_id = create_table_resp.table_id();
+  const HashSchema custom_hash_schema{{{kCol0}, 5, 1}};
+
+  // Alter the table, adding a new range with custom hash schema.
+  {
+    AlterTableRequestPB req;
+    AlterTableResponsePB resp;
+    req.mutable_table()->set_table_name(kTableName);
+    req.mutable_table()->set_table_id(table_id);
+
+    // Add the required information on the table's schema:
+    // key and non-null columns must be present in the request.
+    {
+      ColumnSchemaPB* col0 = req.mutable_schema()->add_columns();
+      col0->set_name(kCol0);
+      col0->set_type(INT32);
+      col0->set_is_key(true);
+
+      ColumnSchemaPB* col1 = req.mutable_schema()->add_columns();
+      col1->set_name(kCol1);
+      col1->set_type(INT64);
+    }
+
+    AlterTableRequestPB::Step* step = req.add_alter_schema_steps();
+    step->set_type(AlterTableRequestPB::ADD_RANGE_PARTITION);
+    KuduPartialRow lower(&kTableSchema);
+    ASSERT_OK(lower.SetInt32(kCol0, 100));
+    KuduPartialRow upper(&kTableSchema);
+    ASSERT_OK(upper.SetInt32(kCol0, 200));
+    RowOperationsPBEncoder enc(
+        step->mutable_add_range_partition()->mutable_range_bounds());
+    enc.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+    enc.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+    for (const auto& hash_dimension: custom_hash_schema) {
+      auto* hash_dimension_pb =
+          step->mutable_add_range_partition()->add_custom_hash_schema();
+      for (const auto& col_name: hash_dimension.columns) {
+        hash_dimension_pb->add_columns()->set_name(col_name);
+      }
+      hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
+      hash_dimension_pb->set_seed(hash_dimension.seed);
+    }
+
+    // Check the number of tablets in the table before ALTER TABLE.
+    {
+      CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+      std::vector<scoped_refptr<TableInfo>> tables;
+      master_->catalog_manager()->GetAllTables(&tables);
+      ASSERT_EQ(1, tables.size());
+      // 2 tablets (because of 2 hash buckets) for already existing range.
+      ASSERT_EQ(2, tables.front()->num_tablets());
+    }
+
+    RpcController ctl;
+    ASSERT_OK(proxy_->AlterTable(req, &resp, &ctl));
+    ASSERT_FALSE(resp.has_error())
+        << StatusFromPB(resp.error().status()).ToString();
+
+    // Check the number of tablets in the table after ALTER TABLE.
+    {
+      CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+      std::vector<scoped_refptr<TableInfo>> tables;
+      master_->catalog_manager()->GetAllTables(&tables);
+      ASSERT_EQ(1, tables.size());
+      // Extra 5 tablets (because of 5 hash buckets) for newly added range.
+      ASSERT_EQ(7, tables.front()->num_tablets());
+    }
+  }
+
+  // Now verify the table's schema: fetch the information on the altered
+  // table and make sure the schema contains information on the newly added
+  // range partition with the custom hash schema.
+  {
+    GetTableSchemaRequestPB req;
+    req.mutable_table()->set_table_name(kTableName);
+
+    RpcController ctl;
+    GetTableSchemaResponsePB resp;
+    ASSERT_OK(proxy_->GetTableSchema(req, &resp, &ctl));
+    ASSERT_FALSE(resp.has_error())
+        << StatusFromPB(resp.error().status()).ToString();
+
+    Schema received_schema;
+    ASSERT_TRUE(resp.has_schema());
+    ASSERT_OK(SchemaFromPB(resp.schema(), &received_schema));
+    ASSERT_TRUE(kTableSchema == received_schema) << Substitute(
+        "$0 not equal to $1", kTableSchema.ToString(), received_schema.ToString());
+
+    ASSERT_TRUE(resp.has_table_id());
+    ASSERT_EQ(table_id, resp.table_id());
+    ASSERT_TRUE(resp.has_table_name());
+    ASSERT_EQ(kTableName, resp.table_name());
+
+    ASSERT_TRUE(resp.has_partition_schema());
+    PartitionSchema ps;
+    ASSERT_OK(PartitionSchema::FromPB(
+        resp.partition_schema(), received_schema, &ps));
+    ASSERT_TRUE(ps.HasCustomHashSchemas());
+
+    const auto& table_wide_hash_schema = ps.hash_schema();
+    ASSERT_EQ(1, table_wide_hash_schema.size());
+    ASSERT_EQ(2, table_wide_hash_schema.front().num_buckets);
+
+    const auto& ranges_with_hash_schemas = ps.ranges_with_hash_schemas();
+    ASSERT_EQ(ranges_with_hash_schemas.size(), 1);
+    const auto& custom_hash_schema = ranges_with_hash_schemas.front().hash_schema;
+    ASSERT_EQ(1, custom_hash_schema.size());
+    ASSERT_EQ(5, custom_hash_schema.front().num_buckets);
+    ASSERT_EQ(1, custom_hash_schema.front().seed);
+  }
+}
+
 TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
   constexpr const char* const kTableName = "testtb";
   const Schema kTableSchema({ ColumnSchema("key", INT32), ColumnSchema("val", INT32) }, 1);