You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/10/09 23:52:39 UTC

[1/2] kudu git commit: KUDU-2173: Partitions are incorrectly pruned when range-partitioned on a PK prefix

Repository: kudu
Updated Branches:
  refs/heads/master 29a7568dd -> 1d928951e


KUDU-2173: Partitions are incorrectly pruned when range-partitioned on a PK prefix

The partition pruner mistakenly treated a range partition which is a
proper prefix of a primary key as an exclusive bound, when in fact it's
an inclusive bound if the remaining PK column constraints are greater
than the min value.

This is a C++-only bug; the Java client only attempts to use the PK as
the range partition bound when the primary key columns match the range
partition columns exactly (see KUDU-2178). Regardless, I added Java
regression tests in order to cover the case when the Java pruner is
improved.

Change-Id: I38752f50c0910cd157a912eaa272c76a1a0d9b59
Reviewed-on: http://gerrit.cloudera.org:8080/8222
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0c823987
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0c823987
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0c823987

Branch: refs/heads/master
Commit: 0c823987fd17698e523b812ba8e923ca0085b1e4
Parents: 29a7568
Author: Dan Burkert <da...@apache.org>
Authored: Thu Oct 5 19:21:43 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Oct 9 23:22:08 2017 +0000

----------------------------------------------------------------------
 .../apache/kudu/client/TestPartitionPruner.java |  88 ++++++-
 src/kudu/common/key_util.cc                     |   7 +-
 src/kudu/common/key_util.h                      |   4 +
 src/kudu/common/partial_row.h                   |   3 +-
 src/kudu/common/partition_pruner-test.cc        | 233 ++++++++++++++++---
 src/kudu/common/partition_pruner.cc             |  27 ++-
 6 files changed, 320 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
index 7aec3a0..1e7605b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
@@ -208,7 +208,6 @@ public class TestPartitionPruner extends BaseKuduTest {
 
     byte min = Byte.MIN_VALUE;
 
-
     // No bounds
     checkPartitionsPrimaryKey(3, table, partitions,
                               null, null);
@@ -217,6 +216,14 @@ public class TestPartitionPruner extends BaseKuduTest {
     checkPartitionsPrimaryKey(1, table, partitions,
                               null, new byte[] { -1, min, min });
 
+    // PK < (0, 0, 0)
+    checkPartitionsPrimaryKey(1, table, partitions,
+                              null, new byte[] { 0, 0, 0 });
+
+    // PK < (0, 0, min)
+    checkPartitionsPrimaryKey(1, table, partitions,
+                              null, new byte[] { 0, 0, min });
+
     // PK < (10, 10, 10)
     checkPartitionsPrimaryKey(2, table, partitions,
                               null, new byte[] { 10, 10, 10 });
@@ -256,7 +263,86 @@ public class TestPartitionPruner extends BaseKuduTest {
     // PK >= (10, 10, 11)
     checkPartitionsPrimaryKey(0, table, partitions,
                               new byte[] { 10, 0, 0 }, new byte[] { 0, 0, 0 });
+  }
+
+  @Test
+  public void testPrimaryKeyPrefixRangePruning() throws Exception {
+    // CREATE TABLE t
+    // (a INT8, b INT8, c INT8)
+    // PRIMARY KEY (a, b, c))
+    // PARTITION BY RANGE (a, b)
+    //    (PARTITION VALUES < (0, 0, 0));
+
+    ArrayList<ColumnSchema> columns = new ArrayList<>(3);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT8).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("b", Type.INT8).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("c", Type.INT8).key(true).build());
+    Schema schema = new Schema(columns);
+
+    CreateTableOptions tableBuilder = new CreateTableOptions();
+    tableBuilder.setRangePartitionColumns(ImmutableList.of("a", "b"));
+
+    PartialRow split = schema.newPartialRow();
+    split.addByte("a", (byte) 0);
+    split.addByte("b", (byte) 0);
+    tableBuilder.addSplitRow(split);
+
+    String tableName = "testPrimaryKeyPrefixRangePruning-" + System.currentTimeMillis();
+
+    syncClient.createTable(tableName, schema, tableBuilder);
+    KuduTable table = syncClient.openTable(tableName);
+    List<Partition> partitions = getTablePartitions(table);
 
+    byte min = Byte.MIN_VALUE;
+    byte max = Byte.MAX_VALUE;
+
+    // No bounds
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              null, null);
+
+    // PK < (-1, min, min)
+    // TODO(KUDU-2178): prune the upper partition.
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              null, new byte[] { -1, min, min });
+
+    // PK < (0, 0, min)
+    // TODO(KUDU-2178): prune the upper partition.
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              null, new byte[] { 0, 0, min });
+
+    // PK < (0, 0, 0)
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              null, new byte[] { 0, 0, 0 });
+
+    // PK < (0, 1, min)
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              null, new byte[] { 0, 1, min });
+
+    // PK < (0, 1, 0)
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              null, new byte[] { 0, 1, 0 });
+
+    // PK < (max, max, min)
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              null, new byte[] { max, max, min });
+
+    // PK < (max, max, 0)
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              null, new byte[] { max, max, 0 });
+
+    // PK >= (0, 0, min)
+    // TODO(KUDU-2178): prune the lower partition.
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              new byte[] { 0, 0, min }, null);
+
+    // PK >= (0, 0, 0)
+    // TODO(KUDU-2178): prune the lower partition.
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              new byte[] { 0, 0, 0 }, null);
+
+    // PK >= (0, -1, 0)
+    checkPartitionsPrimaryKey(2, table, partitions,
+                              new byte[] { 0, -1, 0 }, null);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/src/kudu/common/key_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/key_util.cc b/src/kudu/common/key_util.cc
index 3608b33..c596c63 100644
--- a/src/kudu/common/key_util.cc
+++ b/src/kudu/common/key_util.cc
@@ -330,9 +330,12 @@ int PushLowerBoundKeyPredicates(ColIdxIter first,
 } // anonymous namespace
 
 bool IncrementPrimaryKey(ContiguousRow* row, Arena* arena) {
-  int32_t num_pk_cols = row->schema()->num_key_columns();
+  return IncrementPrimaryKey(row, row->schema()->num_key_columns(), arena);
+}
+
+bool IncrementPrimaryKey(ContiguousRow* row, int32_t num_columns, Arena* arena) {
   return IncrementKey(boost::make_counting_iterator(0),
-                      boost::make_counting_iterator(num_pk_cols),
+                      boost::make_counting_iterator(num_columns),
                       row,
                       arena);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/src/kudu/common/key_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/key_util.h b/src/kudu/common/key_util.h
index 36d2ce4..23cae7d 100644
--- a/src/kudu/common/key_util.h
+++ b/src/kudu/common/key_util.h
@@ -58,6 +58,10 @@ namespace key_util {
 // REQUIRES: all primary key columns must be initialized.
 bool IncrementPrimaryKey(ContiguousRow* row, Arena* arena) WARN_UNUSED_RESULT;
 
+// Like 'IncrementPrimaryKey(ContiguousRow*, Arena*)', but only increments up to
+// 'num_columns' columns of the primary key prefix.
+bool IncrementPrimaryKey(ContiguousRow* row, int32_t num_columns, Arena* arena) WARN_UNUSED_RESULT;
+
 // Increments the provided cell in place.
 bool IncrementCell(const ColumnSchema& col, void* cell_ptr, Arena* arena);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/src/kudu/common/partial_row.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index 17bc8d7..845c4e0 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -488,8 +488,9 @@ class KUDU_EXPORT KuduPartialRow {
   template<typename KeyTypeWrapper> friend struct client::IntKeysTestSetup;
   template<typename KeyTypeWrapper> friend struct tablet::SliceTypeRowOps;
   template<typename KeyTypeWrapper> friend struct tablet::NumTypeRowOps;
-  FRIEND_TEST(PartitionPrunerTest, TestPrimaryKeyRangePruning);
+  FRIEND_TEST(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning);
   FRIEND_TEST(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning);
+  FRIEND_TEST(PartitionPrunerTest, TestPrimaryKeyRangePruning);
 
   template<typename T>
   Status Set(const Slice& col_name, const typename T::cpp_type& val,

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/src/kudu/common/partition_pruner-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc
index 0208529..aba59fa 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -40,6 +40,8 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/move.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/auto_release_pool.h"
+#include "kudu/util/memory/arena.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -67,14 +69,20 @@ void CheckPrunedPartitions(const Schema& schema,
                            size_t remaining_tablets,
                            size_t pruner_ranges) {
 
+  ScanSpec opt_spec(spec);
+  AutoReleasePool p;
+  Arena arena(256, 1024 * 1024);
+  opt_spec.OptimizeScan(schema, &arena, &p, false);
+
   PartitionPruner pruner;
-  pruner.Init(schema, partition_schema, spec);
+  pruner.Init(schema, partition_schema, opt_spec);
 
   SCOPED_TRACE(strings::Substitute("schema: $0", schema.ToString()));
   SCOPED_TRACE(strings::Substitute("partition schema: $0", partition_schema.DebugString(schema)));
   SCOPED_TRACE(strings::Substitute("partition pruner: $0",
                                    pruner.ToString(schema, partition_schema)));
-  SCOPED_TRACE(strings::Substitute("scan spec: $0", spec.ToString(schema)));
+  SCOPED_TRACE(strings::Substitute("optimized scan spec: $0", opt_spec.ToString(schema)));
+  SCOPED_TRACE(strings::Substitute("original  scan spec: $0", spec.ToString(schema)));
 
   int pruned_partitions = count_if(partitions.begin(), partitions.end(),
                                    [&] (const Partition& partition) {
@@ -234,8 +242,8 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
 
   // Applies the specified lower and upper bound primary keys against the
   // schema, and checks that the expected number of partitions are pruned.
-  auto Check = [&] (optional<tuple<int8_t, string>> lower,
-                    optional<tuple<int8_t, string>> upper,
+  auto Check = [&] (optional<tuple<int8_t, string, string>> lower,
+                    optional<tuple<int8_t, string, string>> upper,
                     size_t remaining_tablets ) {
     ScanSpec spec;
     KuduPartialRow lower_bound(&schema);
@@ -246,7 +254,7 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
     if (lower) {
       CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
       CHECK_OK(lower_bound.SetStringCopy("b", get<1>(*lower)));
-      CHECK_OK(lower_bound.SetStringCopy("c", "fuzz"));
+      CHECK_OK(lower_bound.SetStringCopy("c", get<2>(*lower)));
       ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
       enc_lower_bound = EncodedKey::FromContiguousRow(row);
       spec.SetLowerBoundKey(enc_lower_bound.get());
@@ -254,7 +262,7 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
     if (upper) {
       CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
       CHECK_OK(upper_bound.SetStringCopy("b", get<1>(*upper)));
-      CHECK_OK(upper_bound.SetStringCopy("c", "fuzzy"));
+      CHECK_OK(upper_bound.SetStringCopy("c", get<2>(*upper)));
       ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
       enc_upper_bound = EncodedKey::FromContiguousRow(row);
       spec.SetExclusiveUpperBoundKey(enc_upper_bound.get());
@@ -267,46 +275,141 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
   // No bounds
   Check(boost::none, boost::none, 3);
 
-  // PK < (-1, min, _)
-  Check(boost::none, make_tuple<int8_t, string>(-1, ""), 1);
+  // PK < (-1, min, "")
+  Check(boost::none, make_tuple<int8_t, string, string>(-1, "", ""), 1);
+
+  // PK < (10, "r", "")
+  Check(boost::none, make_tuple<int8_t, string, string>(10, "r", ""), 2);
 
-  // PK < (10, "r", _)
-  Check(boost::none, make_tuple<int8_t, string>(10, "r"), 2);
+  // PK < (10, "r", "z")
+  Check(boost::none, make_tuple<int8_t, string, string>(10, "r", "z"), 3);
 
-  // PK < (100, min)
-  Check(boost::none, make_tuple<int8_t, string>(100, ""), 3);
+  // PK < (100, min, "")
+  Check(boost::none, make_tuple<int8_t, string, string>(100, "", ""), 3);
 
-  // PK >= (-10, "m")
-  Check(make_tuple<int8_t, string>(-10, "m"), boost::none, 3);
+  // PK >= (-10, "m", "")
+  Check(make_tuple<int8_t, string, string>(-10, "m", ""), boost::none, 3);
 
-  // PK >= (0, "")
-  Check(make_tuple<int8_t, string>(0, ""), boost::none, 3);
+  // PK >= (0, "", "")
+  Check(make_tuple<int8_t, string, string>(0, "", ""), boost::none, 3);
 
-  // PK >= (0, "m")
-  Check(make_tuple<int8_t, string>(0, "m"), boost::none, 2);
+  // PK >= (0, "m", "")
+  Check(make_tuple<int8_t, string, string>(0, "m", ""), boost::none, 2);
 
-  // PK >= (100, "")
-  Check(make_tuple<int8_t, string>(100, ""), boost::none, 1);
+  // PK >= (100, "", "")
+  Check(make_tuple<int8_t, string, string>(100, "", ""), boost::none, 1);
 
-  // PK >= (-10, 0)
-  // PK  < (100, 0)
-  Check(make_tuple<int8_t, string>(-10, ""),
-        make_tuple<int8_t, string>(100, ""), 3);
+  // PK >= (-10, 0, "")
+  // PK  < (100, 0, "")
+  Check(make_tuple<int8_t, string, string>(-10, "", ""),
+        make_tuple<int8_t, string, string>(100, "", ""), 3);
 
-  // PK >= (0, "m")
-  // PK  < (10, "r")
-  Check(make_tuple<int8_t, string>(0, "m"),
-        make_tuple<int8_t, string>(10, "r"), 1);
+  // PK >= (0, "m", "")
+  // PK  < (10, "r", "")
+  Check(make_tuple<int8_t, string, string>(0, "m", ""),
+        make_tuple<int8_t, string, string>(10, "r", ""), 1);
 
-  // PK >= (0, "")
-  // PK  < (10, "m")
-  Check(make_tuple<int8_t, string>(0, ""),
-        make_tuple<int8_t, string>(10, "m"), 2);
+  // PK >= (0, "m", "")
+  // PK  < (10, "r", "z")
+  Check(make_tuple<int8_t, string, string>(0, "m", ""),
+        make_tuple<int8_t, string, string>(10, "r", "z"), 2);
 
-  // PK >= (10, "m")
-  // PK  < (10, "m")
-  Check(make_tuple<int8_t, string>(10, "m"),
-        make_tuple<int8_t, string>(10, "m"), 1);
+  // PK >= (0, "", "")
+  // PK  < (10, "m", "z")
+  Check(make_tuple<int8_t, string, string>(0, "", ""),
+        make_tuple<int8_t, string, string>(10, "m", "z"), 2);
+
+  // PK >= (10, "m", "")
+  // PK  < (10, "m", "z")
+  Check(make_tuple<int8_t, string, string>(10, "m", ""),
+        make_tuple<int8_t, string, string>(10, "m", "z"), 1);
+}
+
+TEST_F(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning) {
+  // CREATE TABLE t
+  // (a INT8, b INT8, c INT8, PRIMARY KEY (a, b, c))
+  // DISTRIBUTE BY RANGE(a, b)
+  // SPLIT ROWS [(0, 0)];
+
+  // Setup the Schema
+  Schema schema({ ColumnSchema("a", INT8),
+                  ColumnSchema("b", INT8),
+                  ColumnSchema("c", INT8) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) },
+                3);
+
+  PartitionSchema partition_schema;
+  auto pb = PartitionSchemaPB();
+  auto range_schema = pb.mutable_range_schema();
+  range_schema->add_columns()->set_name("a");
+  range_schema->add_columns()->set_name("b");
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  KuduPartialRow split(&schema);
+  ASSERT_OK(split.SetInt8("a", 0));
+  ASSERT_OK(split.SetInt8("b", 0));
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({ split }, {}, schema, &partitions));
+
+  // Applies the specified lower and upper bound primary keys against the
+  // schema, and checks that the expected number of partitions are pruned.
+  auto Check = [&] (optional<tuple<int8_t, int8_t, int8_t>> lower,
+                    optional<tuple<int8_t, int8_t, int8_t>> upper,
+                    size_t remaining_tablets ) {
+    ScanSpec spec;
+    KuduPartialRow lower_bound(&schema);
+    KuduPartialRow upper_bound(&schema);
+    gscoped_ptr<EncodedKey> enc_lower_bound;
+    gscoped_ptr<EncodedKey> enc_upper_bound;
+
+    if (lower) {
+      CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
+      CHECK_OK(lower_bound.SetInt8("b", get<1>(*lower)));
+      CHECK_OK(lower_bound.SetInt8("c", get<2>(*lower)));
+      ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
+      enc_lower_bound = EncodedKey::FromContiguousRow(row);
+      spec.SetLowerBoundKey(enc_lower_bound.get());
+    }
+    if (upper) {
+      CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
+      CHECK_OK(upper_bound.SetInt8("b", get<1>(*upper)));
+      CHECK_OK(upper_bound.SetInt8("c", get<2>(*upper)));
+      ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
+      enc_upper_bound = EncodedKey::FromContiguousRow(row);
+      spec.SetExclusiveUpperBoundKey(enc_upper_bound.get());
+    }
+    size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                          remaining_tablets, pruner_ranges);
+  };
+
+  // No bounds
+  Check(boost::none, boost::none, 2);
+
+  // PK < (0, 0, min)
+  Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(0, INT8_MIN, INT8_MIN), 1);
+
+  // PK < (0, 0, 0);
+  Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(0, 0, 0), 2);
+
+  // PK < (0, max, 0);
+  Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, 0), 2);
+
+  // PK < (max, max, min);
+  Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, INT8_MIN), 2);
+
+  // PK < (max, max, 0);
+  Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, 0), 2);
+
+  // PK >= (0, 0, 0);
+  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0), boost::none, 1);
+
+  // PK >= (0, 0, -1);
+  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, -1), boost::none, 1);
+
+  // PK >= (0, 0, min);
+  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, INT8_MIN), boost::none, 1);
 }
 
 TEST_F(PartitionPrunerTest, TestRangePruning) {
@@ -599,6 +702,7 @@ TEST_F(PartitionPrunerTest, TestInListHashPruning) {
   hash_component_3->add_columns()->set_name("c");
   hash_component_3->set_num_buckets(3);
   hash_component_3->set_seed(0);
+  pb.mutable_range_schema()->clear_columns();
 
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
@@ -686,6 +790,7 @@ TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) {
   hash_component_2->add_columns()->set_name("c");
   hash_component_2->set_num_buckets(3);
   hash_component_2->set_seed(0);
+  pb.mutable_range_schema()->clear_columns();
 
   ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
 
@@ -886,4 +991,60 @@ TEST_F(PartitionPrunerTest, TestPruning) {
   Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
         string("\0\0\0\1", 4), "", 1, 1);
 }
+
+TEST_F(PartitionPrunerTest, TestKudu2173) {
+  // CREATE TABLE t
+  // (a INT8, b INT8, PRIMARY KEY (a, b))
+  // DISTRIBUTE BY RANGE(a)
+  // SPLIT ROWS [(10)]
+
+  // Setup the Schema
+  Schema schema({ ColumnSchema("a", INT8),
+          ColumnSchema("b", INT8)},
+    { ColumnId(0), ColumnId(1) },
+    2);
+
+  PartitionSchema partition_schema;
+  auto pb = PartitionSchemaPB();
+  auto range_schema = pb.mutable_range_schema();
+  range_schema->add_columns()->set_name("a");
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  KuduPartialRow split1(&schema);
+  ASSERT_OK(split1.SetInt8("a", 10));
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({ split1 }, {}, schema, &partitions));
+
+  // Applies the specified predicates to a scan and checks that the expected
+  // number of partitions are pruned.
+  auto Check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
+    ScanSpec spec;
+
+    for (const auto& pred : predicates) {
+      spec.AddPredicate(pred);
+    }
+    size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+                          remaining_tablets, pruner_ranges);
+  };
+
+  int8_t eleven = 11;
+  int8_t max = INT8_MAX;
+
+  // a < 11
+  Check({ ColumnPredicate::Range(schema.column(0), nullptr, &eleven) }, 2);
+
+  // a < 11 AND b < 11
+  Check({ ColumnPredicate::Range(schema.column(0), nullptr, &eleven),
+          ColumnPredicate::Range(schema.column(1), nullptr, &eleven) },
+        2);
+
+  // a < max
+  Check({ ColumnPredicate::Range(schema.column(0), nullptr, &max) }, 2);
+
+  // a < max AND b < 11
+  Check({ ColumnPredicate::Range(schema.column(0), nullptr, &max),
+          ColumnPredicate::Range(schema.column(1), nullptr, &eleven) },
+        2);
+}
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/src/kudu/common/partition_pruner.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner.cc b/src/kudu/common/partition_pruner.cc
index a05c927..e3bbebf 100644
--- a/src/kudu/common/partition_pruner.cc
+++ b/src/kudu/common/partition_pruner.cc
@@ -99,8 +99,9 @@ void EncodeRangeKeysFromPrimaryKeyBounds(const Schema& schema,
       *range_key_end = scan_spec.exclusive_upper_bound_key()->encoded_key().ToString();
     }
   } else {
-    // The range columns are a prefix of the primary key columns. Copy
-    // the column values over to a row, and then encode the row as a range key.
+    // The range-partition key columns are a prefix of the primary key columns.
+    // Copy the column values over to a row, and then encode the row as a range
+    // key.
 
     vector<int32_t> col_idxs(num_range_columns);
     iota(col_idxs.begin(), col_idxs.end(), 0);
@@ -123,6 +124,28 @@ void EncodeRangeKeysFromPrimaryKeyBounds(const Schema& schema,
                scan_spec.exclusive_upper_bound_key()->raw_keys()[idx],
                schema.column(idx).type_info()->size());
       }
+
+      // Determine if the upper bound primary key columns which aren't in the
+      // range-partition key are all set to the minimum value. If so, the
+      // range-partition key prefix of the primary key is already effectively an
+      // exclusive bound. If not, then we increment the range-key prefix in
+      // order to transform it from inclusive to exclusive.
+      bool min_suffix = true;
+      for (int32_t idx = num_range_columns; idx < schema.num_key_columns(); idx++) {
+        min_suffix &= schema.column(idx)
+                            .type_info()
+                            ->IsMinValue(scan_spec.exclusive_upper_bound_key()->raw_keys()[idx]);
+      }
+      Arena arena(std::max<size_t>(Arena::kMinimumChunkSize, schema.key_byte_size()), 4096);
+      if (!min_suffix) {
+        if (!key_util::IncrementPrimaryKey(&row, num_range_columns, &arena)) {
+          // The range-partition key upper bound can't be incremented, which
+          // means it's an inclusive bound on the maximum possible value, so
+          // skip it.
+          return;
+        }
+      }
+
       key_util::EncodeKey(col_idxs, row, range_key_end);
     }
   }


[2/2] kudu git commit: python: replace bespoke minicluster implementation with control shell

Posted by ad...@apache.org.
python: replace bespoke minicluster implementation with control shell

In this case we're using JSON serialization to talk to the shell because the
Python bindings don't already make use of protobuf.

I had to update test_scantoken.py to shut down its multiprocessing pool
after using it, otherwise its subprocesses inherited a copy of the control
shell stdin pipe writer. This led to hangs on stop_cluster() because closing
the control shell's stdin didn't actually close the last stdin writer, thus
the control shell didn't exit.

Change-Id: I821e864cfe738a4d39ae039b95ca38f16cdcfb82
Reviewed-on: http://gerrit.cloudera.org:8080/8236
Tested-by: Kudu Jenkins
Reviewed-by: Wes McKinney <we...@apache.org>
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1d928951
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1d928951
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1d928951

Branch: refs/heads/master
Commit: 1d928951e375c0cdb6c3c151c5bacb3779805f5f
Parents: 0c82398
Author: Adar Dembo <ad...@cloudera.com>
Authored: Sun Oct 8 18:07:05 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon Oct 9 23:52:19 2017 +0000

----------------------------------------------------------------------
 python/kudu/tests/common.py         | 140 ++++++++++---------------------
 python/kudu/tests/test_scantoken.py |   4 +-
 2 files changed, 45 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1d928951/python/kudu/tests/common.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/common.py b/python/kudu/tests/common.py
index c6b1b7d..9596eaa 100644
--- a/python/kudu/tests/common.py
+++ b/python/kudu/tests/common.py
@@ -19,13 +19,8 @@
 from __future__ import division
 
 import json
-import fnmatch
 import os
-import shutil
 import subprocess
-import tempfile
-import time
-import socket
 
 import kudu
 from kudu.client import Partitioning
@@ -42,14 +37,22 @@ class KuduTestBase(object):
     tablet servers.
     """
 
-    BASE_PORT = 37000
-    NUM_TABLET_SERVERS = 3
-    TSERVER_START_TIMEOUT_SECS = 10
     NUM_MASTER_SERVERS = 3
+    NUM_TABLET_SERVERS = 3
+
+    @classmethod
+    def send_and_receive(cls, proc, request):
+        binary_req = (json.dumps(request) + "\n").encode("utf-8")
+        proc.stdin.write(binary_req)
+        proc.stdin.flush()
+        binary_resp = proc.stdout.readline()
+        response = json.loads(binary_resp[:-1].decode("utf-8"))
+        if "error" in response:
+            raise Exception("Error in response: {0}".format(response["error"]))
+        return response
 
     @classmethod
     def start_cluster(cls):
-        local_path = tempfile.mkdtemp(dir=os.getenv("TEST_TMPDIR"))
         kudu_build = os.getenv("KUDU_BUILD")
         if not kudu_build:
             kudu_build = os.path.join(os.getenv("KUDU_HOME"), "build", "latest")
@@ -58,101 +61,42 @@ class KuduTestBase(object):
         master_hosts = []
         master_ports = []
 
-        # We need to get the port numbers for the masters before starting them
-        # so that we can appropriately configure a multi-master.
-        for m in range(cls.NUM_MASTER_SERVERS):
-            master_hosts.append('127.0.0.1')
-            # This introduces a race
-            s = socket.socket()
-            s.bind(('', 0))
-            master_ports.append(s.getsockname()[1])
-            s.close()
-
-        multi_master_string = ','.join('{0}:{1}'.format(host, port)
-                                       for host, port
-                                       in zip(master_hosts, master_ports))
-
-        for m in range(cls.NUM_MASTER_SERVERS):
-            os.makedirs("{0}/master/{1}".format(local_path, m))
-            os.makedirs("{0}/master/{1}/data".format(local_path, m))
-            os.makedirs("{0}/master/{1}/logs".format(local_path, m))
-
-
-            path = [
-                "{0}/kudu-master".format(bin_path),
-                "-unlock_unsafe_flags",
-                "-unlock_experimental_flags",
-                "-rpc_server_allow_ephemeral_ports",
-                "-rpc_bind_addresses=0.0.0.0:{0}".format(master_ports[m]),
-                "-fs_wal_dir={0}/master/{1}/data".format(local_path, m),
-                "-fs_data_dirs={0}/master/{1}/data".format(local_path, m),
-                "-log_dir={0}/master/{1}/logs".format(local_path, m),
-                "-logtostderr",
-                "-webserver_port=0",
-                "-master_addresses={0}".format(multi_master_string),
-                # Only make one replica so that our tests don't need to worry about
-                # setting consistency modes.
-                "-default_num_replicas=1"
-            ]
-
-            p = subprocess.Popen(path, shell=False)
-            fid = open("{0}/master/{1}/kudu-master.pid".format(local_path, m), "w+")
-            fid.write("{0}".format(p.pid))
-            fid.close()
-
-        for m in range(cls.NUM_TABLET_SERVERS):
-            os.makedirs("{0}/ts/{1}".format(local_path, m))
-            os.makedirs("{0}/ts/{1}/logs".format(local_path, m))
-
-            path = [
-                "{0}/kudu-tserver".format(bin_path),
-                "-unlock_unsafe_flags",
-                "-unlock_experimental_flags",
-                "-rpc_server_allow_ephemeral_ports",
-                "-rpc_bind_addresses=0.0.0.0:0",
-                "-tserver_master_addrs={0}".format(multi_master_string),
-                "-webserver_port=0",
-                "-log_dir={0}/ts/{1}/logs".format(local_path, m),
-                "-logtostderr",
-                "-fs_data_dirs={0}/ts/{1}/data".format(local_path, m),
-                "-fs_wal_dir={0}/ts/{1}/data".format(local_path, m),
-            ]
-            p = subprocess.Popen(path, shell=False)
-            tserver_pid = "{0}/ts/{1}/kudu-tserver.pid".format(local_path, m)
-            fid = open(tserver_pid, "w+")
-            fid.write("{0}".format(p.pid))
-            fid.close()
-
-        return local_path, master_hosts, master_ports
+        # Start the mini-cluster control process.
+        args = ["{0}/kudu".format(bin_path), "test", "mini_cluster"]
+        p = subprocess.Popen(args, shell=False,
+                             stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+
+        # Create and start a cluster.
+        #
+        # Only make one replica so that our tests don't need to worry about
+        # setting consistency modes.
+        cls.send_and_receive(
+            p, { "create_cluster" :
+                 { "numMasters" : cls.NUM_MASTER_SERVERS,
+                   "numTservers" : cls.NUM_TABLET_SERVERS,
+                   "extraMasterFlags" : [ "--default_num_replicas=1" ]}})
+        cls.send_and_receive(p, { "start_cluster" : {}})
+
+        # Get information about the cluster's masters.
+        masters = cls.send_and_receive(p, { "get_masters" : {}})
+        for m in masters["getMasters"]["masters"]:
+            master_hosts.append(m["boundRpcAddress"]["host"])
+            master_ports.append(m["boundRpcAddress"]["port"])
+
+        return p, master_hosts, master_ports
 
     @classmethod
-    def stop_cluster(cls, path):
-        for root, dirnames, filenames in os.walk('{0}/..'.format(path)):
-            for filename in fnmatch.filter(filenames, '*.pid'):
-                with open(os.path.join(root, filename)) as fid:
-                    a = fid.read()
-                    r = subprocess.Popen(["kill", "{0}".format(a)])
-                    r.wait()
-                    os.remove(os.path.join(root, filename))
-        shutil.rmtree(path, True)
+    def stop_cluster(cls):
+        cls.cluster_proc.stdin.close()
+        ret = cls.cluster_proc.wait()
+        if ret != 0:
+            raise Exception("Minicluster process exited with code {0}".format(ret))
 
     @classmethod
     def setUpClass(cls):
-        cls.cluster_path, cls.master_hosts, cls.master_ports = cls.start_cluster()
-        time.sleep(1)
-
+        cls.cluster_proc, cls.master_hosts, cls.master_ports = cls.start_cluster()
         cls.client = kudu.connect(cls.master_hosts, cls.master_ports)
 
-        # Wait for all tablet servers to start with the configured timeout
-        timeout = time.time() + cls.TSERVER_START_TIMEOUT_SECS
-        while len(cls.client.list_tablet_servers()) < cls.NUM_TABLET_SERVERS:
-            if time.time() > timeout:
-                raise TimeoutError(
-                    "Tablet servers took too long to start. Timeout set to {}"
-                                   .format(cls.TSERVER_START_TIMEOUT_SECS))
-            # Sleep 50 milliseconds to avoid tight-looping rpc
-            time.sleep(0.05)
-
         cls.schema = cls.example_schema()
         cls.partitioning = cls.example_partitioning()
 
@@ -163,7 +107,7 @@ class KuduTestBase(object):
 
     @classmethod
     def tearDownClass(cls):
-        cls.stop_cluster(cls.cluster_path)
+        cls.stop_cluster()
 
     @classmethod
     def example_schema(cls):

http://git-wip-us.apache.org/repos/asf/kudu/blob/1d928951/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py
index 392e2a9..c675a92 100644
--- a/python/kudu/tests/test_scantoken.py
+++ b/python/kudu/tests/test_scantoken.py
@@ -53,8 +53,10 @@ class TestScanToken(TestScanBase):
         # Begin process pool
         pool = Pool(len(input))
         results = pool.map(_get_scan_token_results, input)
+        pool.close()
+        pool.join()
 
-        #Validate results
+        # Validate results
         actual_tuples = []
         for result in results:
             actual_tuples += result