You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/06/09 17:08:30 UTC
[kudu] branch master updated: [master] KUDU-2671: Range specific hashing during table alter op.
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 250eb90bc [master] KUDU-2671: Range specific hashing during table alter op.
250eb90bc is described below
commit 250eb90bc0e1f4f472f44de8a23ce213595d5ee7
Author: Abhishek Chennaka <ac...@cloudera.com>
AuthorDate: Tue May 10 10:09:31 2022 -0400
[master] KUDU-2671: Range specific hashing during table alter op.
This commit has the changes needed on the master side to
support this functionality. A basic test is added to test
the functionality as well.
Change-Id: Iea9e3317d172c9ae76662c44b21fca9a4819930a
Reviewed-on: http://gerrit.cloudera.org:8080/18515
Tested-by: Alexey Serbin <al...@apache.org>
Reviewed-by: Alexey Serbin <al...@apache.org>
---
src/kudu/master/catalog_manager.cc | 16 ++++-
src/kudu/master/master-test.cc | 128 ++++++++++++++++++++++++++++++++++---
2 files changed, 134 insertions(+), 10 deletions(-)
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 2928466f4..ab3380f23 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2618,7 +2618,6 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
PartitionSchema partition_schema;
RETURN_NOT_OK(PartitionSchema::FromPB(
l.data().pb.partition_schema(), schema, &partition_schema));
-
TableInfo::TabletInfoMap existing_tablets = table->tablet_map();
TableInfo::TabletInfoMap new_tablets;
auto abort_mutations = MakeScopedCleanup([&new_tablets]() {
@@ -2627,9 +2626,22 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
}
});
+ vector<PartitionSchema::HashSchema> range_hash_schemas;
for (const auto& step : steps) {
vector<DecodedRowOperation> ops;
if (step.type() == AlterTableRequestPB::ADD_RANGE_PARTITION) {
+ if (FLAGS_enable_per_range_hash_schemas &&
+ step.add_range_partition().custom_hash_schema_size() > 0) {
+ const Schema schema = client_schema.CopyWithColumnIds();
+ PartitionSchema::HashSchema hash_schema;
+ RETURN_NOT_OK(PartitionSchema::ExtractHashSchemaFromPB(
+ schema, step.add_range_partition().custom_hash_schema(), &hash_schema));
+ if (partition_schema.hash_schema().size() != hash_schema.size()) {
+ return Status::NotSupported(
+ "varying number of hash dimensions per range is not yet supported");
+ }
+ range_hash_schemas.emplace_back(std::move(hash_schema));
+ }
RowOperationsPBDecoder decoder(&step.add_range_partition().range_bounds(),
&client_schema, &schema, nullptr);
RETURN_NOT_OK(decoder.DecodeOperations<DecoderMode::SPLIT_ROWS>(&ops));
@@ -2666,7 +2678,7 @@ Status CatalogManager::ApplyAlterPartitioningSteps(
vector<Partition> partitions;
RETURN_NOT_OK(partition_schema.CreatePartitions(
- {}, {{ *ops[0].split_row, *ops[1].split_row }}, {}, schema, &partitions));
+ {}, {{ *ops[0].split_row, *ops[1].split_row }}, range_hash_schemas, schema, &partitions));
switch (step.type()) {
case AlterTableRequestPB::ADD_RANGE_PARTITION: {
for (const Partition& partition : partitions) {
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index db345476c..5e13ed68c 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -197,6 +197,7 @@ class MasterTest : public KuduTest {
const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds = {},
const vector<HashSchema>& range_hash_schemas = {});
+
Status CreateTable(const string& name,
const Schema& schema,
const optional<TableTypePB>& type,
@@ -204,7 +205,9 @@ class MasterTest : public KuduTest {
const optional<string>& comment,
const vector<KuduPartialRow>& split_rows,
const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds,
- const vector<HashSchema>& range_hash_schemas);
+ const vector<HashSchema>& range_hash_schemas,
+ const HashSchema& table_wide_hash_schema,
+ CreateTableResponsePB* resp);
shared_ptr<Messenger> client_messenger_;
unique_ptr<MiniMaster> mini_master_;
@@ -223,8 +226,9 @@ Status MasterTest::CreateTable(const string& name,
KuduPartialRow split2(&schema);
RETURN_NOT_OK(split2.SetInt32("key", 20));
+ CreateTableResponsePB resp;
return CreateTable(
- name, schema, type, owner, comment, { split1, split2 }, {}, {});
+ name, schema, type, owner, comment, { split1, split2 }, {}, {}, {}, &resp);
}
Status MasterTest::CreateTable(
@@ -233,8 +237,9 @@ Status MasterTest::CreateTable(
const vector<KuduPartialRow>& split_rows,
const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds,
const vector<HashSchema>& range_hash_schemas) {
+ CreateTableResponsePB resp;
return CreateTable(
- name, schema, none, none, none, split_rows, bounds, range_hash_schemas);
+ name, schema, none, none, none, split_rows, bounds, range_hash_schemas, {}, &resp);
}
Status MasterTest::CreateTable(
@@ -245,7 +250,9 @@ Status MasterTest::CreateTable(
const optional<string>& comment,
const vector<KuduPartialRow>& split_rows,
const vector<pair<KuduPartialRow, KuduPartialRow>>& bounds,
- const vector<HashSchema>& range_hash_schemas) {
+ const vector<HashSchema>& range_hash_schemas,
+ const HashSchema& table_wide_hash_schema,
+ CreateTableResponsePB* resp) {
if (!range_hash_schemas.empty() && range_hash_schemas.size() != bounds.size()) {
return Status::InvalidArgument(
@@ -268,6 +275,16 @@ Status MasterTest::CreateTable(
}
auto* ps_pb = req.mutable_partition_schema();
+
+ for (const auto& hash_dimension : table_wide_hash_schema) {
+ auto* hash_schema = ps_pb->add_hash_schema();
+ for (const string& col_name : hash_dimension.columns) {
+ hash_schema->add_columns()->set_name(col_name);
+ }
+ hash_schema->set_num_buckets(hash_dimension.num_buckets);
+ hash_schema->set_seed(hash_dimension.num_buckets);
+ }
+
for (size_t i = 0; i < range_hash_schemas.size(); ++i) {
auto* range = ps_pb->add_custom_hash_schema_ranges();
RowOperationsPBEncoder encoder(range->mutable_range_bounds());
@@ -296,10 +313,9 @@ Status MasterTest::CreateTable(
controller.RequireServerFeature(MasterFeatures::RANGE_PARTITION_BOUNDS);
}
- CreateTableResponsePB resp;
- RETURN_NOT_OK(proxy_->CreateTable(req, &resp, &controller));
- if (resp.has_error()) {
- RETURN_NOT_OK(StatusFromPB(resp.error().status()));
+ RETURN_NOT_OK(proxy_->CreateTable(req, resp, &controller));
+ if (resp->has_error()) {
+ RETURN_NOT_OK(StatusFromPB(resp->error().status()));
}
return Status::OK();
}
@@ -935,6 +951,102 @@ TEST_F(MasterTest, ListTablesWithTableFilter) {
}
}
+class AlterTableWithRangeSpecificHashSchema : public MasterTest,
+ public ::testing::WithParamInterface<bool> {};
+
+TEST_P(AlterTableWithRangeSpecificHashSchema, TestAlterTableWithDifferentHashDimensions) {
+ constexpr const char* const kTableName = "testtb";
+ const Schema kTableSchema({ColumnSchema("key", INT32),
+ ColumnSchema("val", INT32)}, 2);
+ FLAGS_enable_per_range_hash_schemas = true; // enable for testing.
+ FLAGS_default_num_replicas = 1;
+
+ // Create a table with one partition
+ KuduPartialRow a_lower(&kTableSchema);
+ KuduPartialRow a_upper(&kTableSchema);
+ ASSERT_OK(a_lower.SetInt32("key", 0));
+ ASSERT_OK(a_upper.SetInt32("key", 100));
+ CreateTableResponsePB create_table_resp;
+ ASSERT_OK(CreateTable(
+ kTableName, kTableSchema, none, none, none, {}, {{a_lower, a_upper}},
+ {}, {{{"key"}, 2, 0}}, &create_table_resp));
+
+ // Populate the custom hash schemas with different hash dimension count based on
+ // the test case
+ HashSchema custom_range_hash_schema;
+ const bool has_different_dimensions_count = GetParam();
+ if (has_different_dimensions_count) {
+ custom_range_hash_schema = {{{"key"}, 3, 0},
+ {{"val"}, 3, 0}};
+ } else {
+ custom_range_hash_schema = {{{"key"}, 3, 0}};
+ }
+
+ //Create AlterTableRequestPB and populate it for the alter table operation
+ AlterTableRequestPB req;
+ AlterTableResponsePB resp;
+ RpcController controller;
+ req.mutable_table()->set_table_name(kTableName);
+ req.mutable_table()->set_table_id(create_table_resp.table_id());
+
+ AlterTableRequestPB::Step* step = req.add_alter_schema_steps();
+ step->set_type(AlterTableRequestPB::ADD_RANGE_PARTITION);
+ KuduPartialRow lower(&kTableSchema);
+ ASSERT_OK(lower.SetInt32("key", 200));
+ KuduPartialRow upper(&kTableSchema);
+ ASSERT_OK(upper.SetInt32("key", 300));
+ RowOperationsPBEncoder splits_encoder(
+ step->mutable_add_range_partition()->mutable_range_bounds());
+ splits_encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, lower);
+ splits_encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, upper);
+ for (const auto& hash_dimension: custom_range_hash_schema) {
+ auto* hash_dimension_pb = step->mutable_add_range_partition()->add_custom_hash_schema();
+ for (const string& col_name: hash_dimension.columns) {
+ hash_dimension_pb->add_columns()->set_name(col_name);
+ }
+ hash_dimension_pb->set_num_buckets(hash_dimension.num_buckets);
+ hash_dimension_pb->set_seed(hash_dimension.seed);
+ }
+
+ ColumnSchemaPB* col1 = req.mutable_schema()->add_columns();
+ col1->set_name("key");
+ col1->set_type(INT32);
+ col1->set_is_key(true);
+
+ ColumnSchemaPB* col2 = req.mutable_schema()->add_columns();
+ col2->set_name("val");
+ col2->set_type(INT32);
+ col2->set_is_key(true);
+
+ // Check the number of tablets in the table
+ std::vector<scoped_refptr<TableInfo>> tables;
+ {
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ master_->catalog_manager()->GetAllTables(&tables);
+ }
+ ASSERT_EQ(1, tables.size());
+ ASSERT_EQ(2, tables.front()->num_tablets());
+
+ // Submit the alter table request
+ proxy_->AlterTable(req, &resp, &controller);
+ if (has_different_dimensions_count) {
+ ASSERT_TRUE(resp.has_error());
+ ASSERT_STR_CONTAINS(resp.error().status().DebugString(),
+ "varying number of hash dimensions per range is not yet supported");
+ } else {
+ ASSERT_FALSE(resp.has_error());
+ {
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ master_->catalog_manager()->GetAllTables(&tables);
+ }
+ ASSERT_EQ(1, tables.size());
+ ASSERT_EQ(5, tables.front()->num_tablets());
+ }
+}
+
+INSTANTIATE_TEST_SUITE_P(AlterTableWithCustomHashSchema,
+ AlterTableWithRangeSpecificHashSchema, ::testing::Bool());
+
TEST_F(MasterTest, TestCreateTableCheckRangeInvariants) {
constexpr const char* const kTableName = "testtb";
const Schema kTableSchema({ ColumnSchema("key", INT32), ColumnSchema("val", INT32) }, 1);