You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2021/02/24 00:08:29 UTC

[kudu] branch master updated: KUDU-2671: Adds compatibility for per range hash schemas with unbounded ranges.

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new d7b5abc  KUDU-2671: Adds compatibility for per range hash schemas with unbounded ranges.
d7b5abc is described below

commit d7b5abc027d492a60ebf5059b27541fc04cfaab3
Author: Mahesh Reddy <mr...@cloudera.com>
AuthorDate: Fri Feb 19 17:14:29 2021 -0800

    KUDU-2671: Adds compatibility for per range hash schemas with unbounded ranges.
    
    This patch updates the logic at the end of PartitionSchema::CreatePartiitons()
    to allow per range hash schemas to be compatible with unbounded ranges. Some
    additional context about the block of code is given below.
    
    For the start partition key, it iterates in reverse order through the partition's
    hash buckets. It breaks out of the loop at the first instance of a bucket not
    equal to 0; If the bucket is equal to 0 it erases that part of the partition key.
    Essentially, if all hash buckets are equal to 0, then it erases the entire key.
    
    For the end partition key, it also iterates in reverse order through the
    partition's hash buckets. It first erases the index portition of the
    partition key. It then checks if the current hash bucket is the max
    bucket of the current hash schema. If it is not the max, it encodes the
    current hash bucket + 1 at the index portion of the key and breaks the loop.
    If it is the max, it continues within the loop. Essentially, if all the
    hash buckets are the max then it erases the entire key.
    
    Prior to this change, this block of code assumed the same hash bucket
    schema for each partition. With per range hash schemas, that may not
    necessarily be the case. The vector 'partition_idx_to_hash_schemas_idx'
    maps each partition to the index of 'bounds_with_hash_schemas' to ensure
    the correct hash bucket schema is used. '-1' is used to signify the use
    of the table wide hash schema.
    
    Change-Id: I5f6c709e211359b04f7597af5f670c787bda7481
    Reviewed-on: http://gerrit.cloudera.org:8080/17090
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/common/partition-test.cc | 139 ++++++++++++++++++++++++++++++++++++++
 src/kudu/common/partition.cc      |  38 +++++++++--
 2 files changed, 172 insertions(+), 5 deletions(-)

diff --git a/src/kudu/common/partition-test.cc b/src/kudu/common/partition-test.cc
index 03670a4..70b63f8 100644
--- a/src/kudu/common/partition-test.cc
+++ b/src/kudu/common/partition-test.cc
@@ -1089,4 +1089,143 @@ TEST_F(PartitionTest, TestVaryingHashSchemasPerRange) {
             "range hash schemas.", s1.ToString());
 
 }
+
+TEST_F(PartitionTest, TestVaryingHashSchemasPerUnboundedRanges) {
+  // CREATE TABLE t (a VARCHAR, b VARCHAR, c VARCHAR, PRIMARY KEY (a, b, c))
+  // PARTITION BY [HASH BUCKET (a, c), HASH BUCKET (b), RANGE (a, b, c)];
+  Schema schema({ ColumnSchema("a", STRING),
+                  ColumnSchema("b", STRING),
+                  ColumnSchema("c", STRING) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) }, 3);
+
+  PartitionSchemaPB schema_builder;
+  // Table-wide hash schema defined below.
+  AddHashBucketComponent(&schema_builder, { "b" }, 2, 0);
+  PartitionSchema partition_schema;
+  ASSERT_OK(PartitionSchema::FromPB(schema_builder, schema, &partition_schema));
+
+  ASSERT_EQ("HASH (b) PARTITIONS 2, RANGE (a, b, c)",
+            partition_schema.DebugString(schema));
+
+  vector<pair<KuduPartialRow, KuduPartialRow>> bounds;
+  PartitionSchema::RangeHashSchema range_hash_schemas;
+
+  { // [(_, _, _), (a1, _, c1))
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(upper.SetStringCopy("a", "a1"));
+    ASSERT_OK(upper.SetStringCopy("c", "c1"));
+    PartitionSchema::HashBucketSchemas hash_schema_4_buckets = {{{ColumnId(0)}, 4, 0}};
+    bounds.emplace_back(lower, upper);
+    range_hash_schemas.emplace_back(hash_schema_4_buckets);
+  }
+
+  { // [(a2, b2, _), (a3, b3, _))
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a2"));
+    ASSERT_OK(lower.SetStringCopy("b", "b2"));
+    ASSERT_OK(upper.SetStringCopy("a", "a3"));
+    ASSERT_OK(upper.SetStringCopy("b", "b3"));
+    bounds.emplace_back(lower, upper);
+    range_hash_schemas.emplace_back(PartitionSchema::HashBucketSchemas());
+  }
+
+  { // [(a4, b4, _), (_, _, _))
+    KuduPartialRow lower(&schema);
+    KuduPartialRow upper(&schema);
+    ASSERT_OK(lower.SetStringCopy("a", "a4"));
+    ASSERT_OK(lower.SetStringCopy("b", "b4"));
+    PartitionSchema::HashBucketSchemas hash_schema_2_buckets_by_3 = {
+        {{ColumnId(0)}, 2, 0},
+        {{ColumnId(2)}, 3, 0}
+    };
+    bounds.emplace_back(lower, upper);
+    range_hash_schemas.emplace_back(hash_schema_2_buckets_by_3);
+  }
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({}, bounds, range_hash_schemas, schema, &partitions));
+  ASSERT_EQ(12, partitions.size());
+  // Partitions below sorted by range, can verify that the partition keyspace is filled by checking
+  // that the start key of the first partition and the end key of the last partition is cleared.
+
+  EXPECT_EQ(0, partitions[0].hash_buckets()[0]);
+  EXPECT_EQ("", partitions[0].range_key_start());
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[0].range_key_end());
+  EXPECT_EQ("", partitions[0].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "a1\0\0\0\0c1", 12), partitions[0].partition_key_end());
+
+  EXPECT_EQ(1, partitions[1].hash_buckets()[0]);
+  EXPECT_EQ("", partitions[1].range_key_start());
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[1].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1", 4),partitions[1].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "a1\0\0\0\0c1", 12), partitions[1].partition_key_end());
+
+  EXPECT_EQ(2, partitions[2].hash_buckets()[0]);
+  EXPECT_EQ("", partitions[2].range_key_start());
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[2].range_key_end());
+  EXPECT_EQ(string("\0\0\0\2", 4), partitions[2].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\2" "a1\0\0\0\0c1", 12), partitions[2].partition_key_end());
+
+  EXPECT_EQ(3, partitions[3].hash_buckets()[0]);
+  EXPECT_EQ("", partitions[3].range_key_start());
+  EXPECT_EQ(string("a1\0\0\0\0c1", 8), partitions[3].range_key_end());
+  EXPECT_EQ(string("\0\0\0\3", 4), partitions[3].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\3" "a1\0\0\0\0c1", 12), partitions[3].partition_key_end());
+
+  EXPECT_EQ(0, partitions[4].hash_buckets()[0]);
+  EXPECT_EQ(string("a2\0\0b2\0\0", 8), partitions[4].range_key_start());
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[4].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "a2\0\0b2\0\0", 12), partitions[4].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "a3\0\0b3\0\0", 12), partitions[4].partition_key_end());
+
+  EXPECT_EQ(1, partitions[5].hash_buckets()[0]);
+  EXPECT_EQ(string("a2\0\0b2\0\0", 8), partitions[5].range_key_start());
+  EXPECT_EQ(string("a3\0\0b3\0\0", 8), partitions[5].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "a2\0\0b2\0\0", 12), partitions[5].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "a3\0\0b3\0\0", 12), partitions[5].partition_key_end());
+
+  EXPECT_EQ(0, partitions[6].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[6].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[6].range_key_start());
+  EXPECT_EQ("", partitions[6].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\0" "a4\0\0b4\0\0", 16), partitions[6].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1", 8), partitions[6].partition_key_end());
+
+  EXPECT_EQ(0, partitions[7].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[7].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[7].range_key_start());
+  EXPECT_EQ("", partitions[7].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\1" "a4\0\0b4\0\0", 16),partitions[7].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2", 8), partitions[7].partition_key_end());
+
+  EXPECT_EQ(0, partitions[8].hash_buckets()[0]);
+  EXPECT_EQ(2, partitions[8].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[8].range_key_start());
+  EXPECT_EQ("", partitions[8].range_key_end());
+  EXPECT_EQ(string("\0\0\0\0" "\0\0\0\2" "a4\0\0b4\0\0", 16), partitions[8].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1", 4), partitions[8].partition_key_end());
+
+  EXPECT_EQ(1, partitions[9].hash_buckets()[0]);
+  EXPECT_EQ(0, partitions[9].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[9].range_key_start());
+  EXPECT_EQ("", partitions[9].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\0" "a4\0\0b4\0\0", 16), partitions[9].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1", 8), partitions[9].partition_key_end());
+
+  EXPECT_EQ(1, partitions[10].hash_buckets()[0]);
+  EXPECT_EQ(1, partitions[10].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[10].range_key_start());
+  EXPECT_EQ("", partitions[10].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\1" "a4\0\0b4\0\0", 16),partitions[10].partition_key_start());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2", 8), partitions[10].partition_key_end());
+
+  EXPECT_EQ(1, partitions[11].hash_buckets()[0]);
+  EXPECT_EQ(2, partitions[11].hash_buckets()[1]);
+  EXPECT_EQ(string("a4\0\0b4\0\0", 8), partitions[11].range_key_start());
+  EXPECT_EQ("", partitions[11].range_key_end());
+  EXPECT_EQ(string("\0\0\0\1" "\0\0\0\2" "a4\0\0b4\0\0", 16), partitions[11].partition_key_start());
+  EXPECT_EQ("", partitions[11].partition_key_end());
+}
 } // namespace kudu
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index 6b2018e..ae276c1 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -449,6 +449,11 @@ Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>& split_row
   RETURN_NOT_OK(EncodeRangeSplits(split_rows, schema, &splits));
   RETURN_NOT_OK(SplitRangeBounds(schema, std::move(splits), &bounds_with_hash_schemas));
 
+  // Maps each partition to its respective hash schemas within 'bounds_with_hash_schemas',
+  // needed for logic later in function for filling in holes in partition key space. Will be
+  // empty if no per range hash schemas are used.
+  vector<int> partition_idx_to_hash_schemas_idx;
+
   if (!range_hash_schemas.empty()) {
     // The number of ranges should match the size of range_hash_schemas.
     DCHECK_EQ(range_hash_schemas.size(), bounds_with_hash_schemas.size());
@@ -456,7 +461,8 @@ Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>& split_row
     DCHECK(split_rows.empty());
     vector<Partition> result_partitions;
     // Iterate through each bound and its hash schemas to generate hash partitions.
-    for (const auto& bound : bounds_with_hash_schemas) {
+    for (int i = 0; i < bounds_with_hash_schemas.size(); i++) {
+      const auto& bound = bounds_with_hash_schemas[i];
       const auto& current_range_hash_schemas = bound.hash_schemas;
       vector<Partition> current_bound_hash_partitions;
       // If current bound's HashBucketSchema is empty, implies use of default table-wide schema.
@@ -467,20 +473,23 @@ Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>& split_row
         current_bound_hash_partitions = GenerateHashPartitions(current_range_hash_schemas,
                                                                hash_encoder);
       }
-      // Adds range part to partition key.
+      // Add range part to partition key.
       for (Partition& partition : current_bound_hash_partitions) {
         partition.partition_key_start_.append(bound.lower);
         partition.partition_key_end_.append(bound.upper);
+        int index = current_range_hash_schemas.empty() ? -1 : i;
+        partition_idx_to_hash_schemas_idx.emplace_back(index);
       }
       result_partitions.insert(result_partitions.end(),
                                std::make_move_iterator(current_bound_hash_partitions.begin()),
                                std::make_move_iterator(current_bound_hash_partitions.end()));
     }
+    DCHECK_EQ(partition_idx_to_hash_schemas_idx.size(), result_partitions.size());
     *partitions = std::move(result_partitions);
   } else {
     // Create a partition per range bound and hash bucket combination.
     vector<Partition> new_partitions;
-    for (const Partition &base_partition : base_hash_partitions) {
+    for (const Partition& base_partition : base_hash_partitions) {
       for (const auto& bound : bounds_with_hash_schemas) {
         Partition partition = base_partition;
         partition.partition_key_start_.append(bound.lower);
@@ -508,7 +517,11 @@ Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>& split_row
   // the absolute start and end case, these holes are filled by clearing the
   // partition key beginning at the hash component. For a concrete example,
   // see PartitionTest::TestCreatePartitions.
-  for (Partition& partition : *partitions) {
+  const HashBucketSchema* hash_bucket_schema;
+  for (int j = 0; j < partitions->size(); j++) {
+    Partition& partition = (*partitions)[j];
+    // Find the first zero-valued bucket from the end and truncate the partition key
+    // starting from that bucket onwards for zero-valued buckets.
     if (partition.range_key_start().empty()) {
       for (int i = static_cast<int>(partition.hash_buckets().size()) - 1; i >= 0; i--) {
         if (partition.hash_buckets()[i] != 0) {
@@ -517,11 +530,26 @@ Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>& split_row
         partition.partition_key_start_.erase(kEncodedBucketSize * i);
       }
     }
+    // Starting from the last hash bucket, truncate the partition key until we hit the first
+    // non-max-valued bucket, at which point, replace the encoding with the next-incremented
+    // bucket value. For example, the following range end partition keys should be transformed,
+    // where the key is (hash_comp1, hash_comp2, range_comp):
+    //
+    // [ (0, 0, "a2b2") -> (0, 1, "a2b2") ]
+    // [ (0, 1, "a2b2") -> (1, _, "a2b2") ]
+    // [ (1, 0, "a2b2") -> (1, 1, "a2b2") ]
+    // [ (1, 1, "a2b2") -> (_, _, "a2b2") ]
     if (partition.range_key_end().empty()) {
       for (int i = static_cast<int>(partition.hash_buckets().size()) - 1; i >= 0; i--) {
         partition.partition_key_end_.erase(kEncodedBucketSize * i);
         int32_t hash_bucket = partition.hash_buckets()[i] + 1;
-        if (hash_bucket != hash_bucket_schemas_[i].num_buckets) {
+        if (range_hash_schemas.empty() || partition_idx_to_hash_schemas_idx[j] == -1) {
+          hash_bucket_schema = &hash_bucket_schemas_[i];
+        } else {
+          const auto& hash_schemas_idx = partition_idx_to_hash_schemas_idx[j];
+          hash_bucket_schema = &bounds_with_hash_schemas[hash_schemas_idx].hash_schemas[i];
+        }
+        if (hash_bucket != hash_bucket_schema->num_buckets) {
           hash_encoder.Encode(&hash_bucket, &partition.partition_key_end_);
           break;
         }