You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/10/09 23:52:39 UTC
[1/2] kudu git commit: KUDU-2173: Partitions are incorrectly pruned
when range-partitioned on a PK prefix
Repository: kudu
Updated Branches:
refs/heads/master 29a7568dd -> 1d928951e
KUDU-2173: Partitions are incorrectly pruned when range-partitioned on a PK prefix
The partition pruner mistakenly treated a range partition which is a
proper prefix of a primary key as an exclusive bound, when in fact it's
an inclusive bound if the remaining PK column constraints are greater
than the min value.
This is a C++-only bug; the Java client only attempts to use the PK as
the range partition bound when the primary key columns match the range
partition columns exactly (see KUDU-2178). Regardless, I added Java
regression tests in order to cover the case when the Java pruner is
improved.
Change-Id: I38752f50c0910cd157a912eaa272c76a1a0d9b59
Reviewed-on: http://gerrit.cloudera.org:8080/8222
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0c823987
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0c823987
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0c823987
Branch: refs/heads/master
Commit: 0c823987fd17698e523b812ba8e923ca0085b1e4
Parents: 29a7568
Author: Dan Burkert <da...@apache.org>
Authored: Thu Oct 5 19:21:43 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Oct 9 23:22:08 2017 +0000
----------------------------------------------------------------------
.../apache/kudu/client/TestPartitionPruner.java | 88 ++++++-
src/kudu/common/key_util.cc | 7 +-
src/kudu/common/key_util.h | 4 +
src/kudu/common/partial_row.h | 3 +-
src/kudu/common/partition_pruner-test.cc | 233 ++++++++++++++++---
src/kudu/common/partition_pruner.cc | 27 ++-
6 files changed, 320 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
index 7aec3a0..1e7605b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestPartitionPruner.java
@@ -208,7 +208,6 @@ public class TestPartitionPruner extends BaseKuduTest {
byte min = Byte.MIN_VALUE;
-
// No bounds
checkPartitionsPrimaryKey(3, table, partitions,
null, null);
@@ -217,6 +216,14 @@ public class TestPartitionPruner extends BaseKuduTest {
checkPartitionsPrimaryKey(1, table, partitions,
null, new byte[] { -1, min, min });
+ // PK < (0, 0, 0)
+ checkPartitionsPrimaryKey(1, table, partitions,
+ null, new byte[] { 0, 0, 0 });
+
+ // PK < (0, 0, min)
+ checkPartitionsPrimaryKey(1, table, partitions,
+ null, new byte[] { 0, 0, min });
+
// PK < (10, 10, 10)
checkPartitionsPrimaryKey(2, table, partitions,
null, new byte[] { 10, 10, 10 });
@@ -256,7 +263,86 @@ public class TestPartitionPruner extends BaseKuduTest {
// PK >= (10, 10, 11)
checkPartitionsPrimaryKey(0, table, partitions,
new byte[] { 10, 0, 0 }, new byte[] { 0, 0, 0 });
+ }
+
+ @Test
+ public void testPrimaryKeyPrefixRangePruning() throws Exception {
+ // CREATE TABLE t
+ // (a INT8, b INT8, c INT8)
+ // PRIMARY KEY (a, b, c))
+ // PARTITION BY RANGE (a, b)
+ // (PARTITION VALUES < (0, 0, 0));
+
+ ArrayList<ColumnSchema> columns = new ArrayList<>(3);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT8).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("b", Type.INT8).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c", Type.INT8).key(true).build());
+ Schema schema = new Schema(columns);
+
+ CreateTableOptions tableBuilder = new CreateTableOptions();
+ tableBuilder.setRangePartitionColumns(ImmutableList.of("a", "b"));
+
+ PartialRow split = schema.newPartialRow();
+ split.addByte("a", (byte) 0);
+ split.addByte("b", (byte) 0);
+ tableBuilder.addSplitRow(split);
+
+ String tableName = "testPrimaryKeyPrefixRangePruning-" + System.currentTimeMillis();
+
+ syncClient.createTable(tableName, schema, tableBuilder);
+ KuduTable table = syncClient.openTable(tableName);
+ List<Partition> partitions = getTablePartitions(table);
+ byte min = Byte.MIN_VALUE;
+ byte max = Byte.MAX_VALUE;
+
+ // No bounds
+ checkPartitionsPrimaryKey(2, table, partitions,
+ null, null);
+
+ // PK < (-1, min, min)
+ // TODO(KUDU-2178): prune the upper partition.
+ checkPartitionsPrimaryKey(2, table, partitions,
+ null, new byte[] { -1, min, min });
+
+ // PK < (0, 0, min)
+ // TODO(KUDU-2178): prune the upper partition.
+ checkPartitionsPrimaryKey(2, table, partitions,
+ null, new byte[] { 0, 0, min });
+
+ // PK < (0, 0, 0)
+ checkPartitionsPrimaryKey(2, table, partitions,
+ null, new byte[] { 0, 0, 0 });
+
+ // PK < (0, 1, min)
+ checkPartitionsPrimaryKey(2, table, partitions,
+ null, new byte[] { 0, 1, min });
+
+ // PK < (0, 1, 0)
+ checkPartitionsPrimaryKey(2, table, partitions,
+ null, new byte[] { 0, 1, 0 });
+
+ // PK < (max, max, min)
+ checkPartitionsPrimaryKey(2, table, partitions,
+ null, new byte[] { max, max, min });
+
+ // PK < (max, max, 0)
+ checkPartitionsPrimaryKey(2, table, partitions,
+ null, new byte[] { max, max, 0 });
+
+ // PK >= (0, 0, min)
+ // TODO(KUDU-2178): prune the lower partition.
+ checkPartitionsPrimaryKey(2, table, partitions,
+ new byte[] { 0, 0, min }, null);
+
+ // PK >= (0, 0, 0)
+ // TODO(KUDU-2178): prune the lower partition.
+ checkPartitionsPrimaryKey(2, table, partitions,
+ new byte[] { 0, 0, 0 }, null);
+
+ // PK >= (0, -1, 0)
+ checkPartitionsPrimaryKey(2, table, partitions,
+ new byte[] { 0, -1, 0 }, null);
}
@Test
http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/src/kudu/common/key_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/key_util.cc b/src/kudu/common/key_util.cc
index 3608b33..c596c63 100644
--- a/src/kudu/common/key_util.cc
+++ b/src/kudu/common/key_util.cc
@@ -330,9 +330,12 @@ int PushLowerBoundKeyPredicates(ColIdxIter first,
} // anonymous namespace
bool IncrementPrimaryKey(ContiguousRow* row, Arena* arena) {
- int32_t num_pk_cols = row->schema()->num_key_columns();
+ return IncrementPrimaryKey(row, row->schema()->num_key_columns(), arena);
+}
+
+bool IncrementPrimaryKey(ContiguousRow* row, int32_t num_columns, Arena* arena) {
return IncrementKey(boost::make_counting_iterator(0),
- boost::make_counting_iterator(num_pk_cols),
+ boost::make_counting_iterator(num_columns),
row,
arena);
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/src/kudu/common/key_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/key_util.h b/src/kudu/common/key_util.h
index 36d2ce4..23cae7d 100644
--- a/src/kudu/common/key_util.h
+++ b/src/kudu/common/key_util.h
@@ -58,6 +58,10 @@ namespace key_util {
// REQUIRES: all primary key columns must be initialized.
bool IncrementPrimaryKey(ContiguousRow* row, Arena* arena) WARN_UNUSED_RESULT;
+// Like 'IncrementPrimaryKey(ContiguousRow*, Arena*)', but only increments up to
+// 'num_columns' columns of the primary key prefix.
+bool IncrementPrimaryKey(ContiguousRow* row, int32_t num_columns, Arena* arena) WARN_UNUSED_RESULT;
+
// Increments the provided cell in place.
bool IncrementCell(const ColumnSchema& col, void* cell_ptr, Arena* arena);
http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/src/kudu/common/partial_row.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index 17bc8d7..845c4e0 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -488,8 +488,9 @@ class KUDU_EXPORT KuduPartialRow {
template<typename KeyTypeWrapper> friend struct client::IntKeysTestSetup;
template<typename KeyTypeWrapper> friend struct tablet::SliceTypeRowOps;
template<typename KeyTypeWrapper> friend struct tablet::NumTypeRowOps;
- FRIEND_TEST(PartitionPrunerTest, TestPrimaryKeyRangePruning);
+ FRIEND_TEST(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning);
FRIEND_TEST(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning);
+ FRIEND_TEST(PartitionPrunerTest, TestPrimaryKeyRangePruning);
template<typename T>
Status Set(const Slice& col_name, const typename T::cpp_type& val,
http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/src/kudu/common/partition_pruner-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc
index 0208529..aba59fa 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -40,6 +40,8 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/move.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/auto_release_pool.h"
+#include "kudu/util/memory/arena.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -67,14 +69,20 @@ void CheckPrunedPartitions(const Schema& schema,
size_t remaining_tablets,
size_t pruner_ranges) {
+ ScanSpec opt_spec(spec);
+ AutoReleasePool p;
+ Arena arena(256, 1024 * 1024);
+ opt_spec.OptimizeScan(schema, &arena, &p, false);
+
PartitionPruner pruner;
- pruner.Init(schema, partition_schema, spec);
+ pruner.Init(schema, partition_schema, opt_spec);
SCOPED_TRACE(strings::Substitute("schema: $0", schema.ToString()));
SCOPED_TRACE(strings::Substitute("partition schema: $0", partition_schema.DebugString(schema)));
SCOPED_TRACE(strings::Substitute("partition pruner: $0",
pruner.ToString(schema, partition_schema)));
- SCOPED_TRACE(strings::Substitute("scan spec: $0", spec.ToString(schema)));
+ SCOPED_TRACE(strings::Substitute("optimized scan spec: $0", opt_spec.ToString(schema)));
+ SCOPED_TRACE(strings::Substitute("original scan spec: $0", spec.ToString(schema)));
int pruned_partitions = count_if(partitions.begin(), partitions.end(),
[&] (const Partition& partition) {
@@ -234,8 +242,8 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
// Applies the specified lower and upper bound primary keys against the
// schema, and checks that the expected number of partitions are pruned.
- auto Check = [&] (optional<tuple<int8_t, string>> lower,
- optional<tuple<int8_t, string>> upper,
+ auto Check = [&] (optional<tuple<int8_t, string, string>> lower,
+ optional<tuple<int8_t, string, string>> upper,
size_t remaining_tablets ) {
ScanSpec spec;
KuduPartialRow lower_bound(&schema);
@@ -246,7 +254,7 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
if (lower) {
CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
CHECK_OK(lower_bound.SetStringCopy("b", get<1>(*lower)));
- CHECK_OK(lower_bound.SetStringCopy("c", "fuzz"));
+ CHECK_OK(lower_bound.SetStringCopy("c", get<2>(*lower)));
ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
enc_lower_bound = EncodedKey::FromContiguousRow(row);
spec.SetLowerBoundKey(enc_lower_bound.get());
@@ -254,7 +262,7 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
if (upper) {
CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
CHECK_OK(upper_bound.SetStringCopy("b", get<1>(*upper)));
- CHECK_OK(upper_bound.SetStringCopy("c", "fuzzy"));
+ CHECK_OK(upper_bound.SetStringCopy("c", get<2>(*upper)));
ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
enc_upper_bound = EncodedKey::FromContiguousRow(row);
spec.SetExclusiveUpperBoundKey(enc_upper_bound.get());
@@ -267,46 +275,141 @@ TEST_F(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning) {
// No bounds
Check(boost::none, boost::none, 3);
- // PK < (-1, min, _)
- Check(boost::none, make_tuple<int8_t, string>(-1, ""), 1);
+ // PK < (-1, min, "")
+ Check(boost::none, make_tuple<int8_t, string, string>(-1, "", ""), 1);
+
+ // PK < (10, "r", "")
+ Check(boost::none, make_tuple<int8_t, string, string>(10, "r", ""), 2);
- // PK < (10, "r", _)
- Check(boost::none, make_tuple<int8_t, string>(10, "r"), 2);
+ // PK < (10, "r", "z")
+ Check(boost::none, make_tuple<int8_t, string, string>(10, "r", "z"), 3);
- // PK < (100, min)
- Check(boost::none, make_tuple<int8_t, string>(100, ""), 3);
+ // PK < (100, min, "")
+ Check(boost::none, make_tuple<int8_t, string, string>(100, "", ""), 3);
- // PK >= (-10, "m")
- Check(make_tuple<int8_t, string>(-10, "m"), boost::none, 3);
+ // PK >= (-10, "m", "")
+ Check(make_tuple<int8_t, string, string>(-10, "m", ""), boost::none, 3);
- // PK >= (0, "")
- Check(make_tuple<int8_t, string>(0, ""), boost::none, 3);
+ // PK >= (0, "", "")
+ Check(make_tuple<int8_t, string, string>(0, "", ""), boost::none, 3);
- // PK >= (0, "m")
- Check(make_tuple<int8_t, string>(0, "m"), boost::none, 2);
+ // PK >= (0, "m", "")
+ Check(make_tuple<int8_t, string, string>(0, "m", ""), boost::none, 2);
- // PK >= (100, "")
- Check(make_tuple<int8_t, string>(100, ""), boost::none, 1);
+ // PK >= (100, "", "")
+ Check(make_tuple<int8_t, string, string>(100, "", ""), boost::none, 1);
- // PK >= (-10, 0)
- // PK < (100, 0)
- Check(make_tuple<int8_t, string>(-10, ""),
- make_tuple<int8_t, string>(100, ""), 3);
+ // PK >= (-10, 0, "")
+ // PK < (100, 0, "")
+ Check(make_tuple<int8_t, string, string>(-10, "", ""),
+ make_tuple<int8_t, string, string>(100, "", ""), 3);
- // PK >= (0, "m")
- // PK < (10, "r")
- Check(make_tuple<int8_t, string>(0, "m"),
- make_tuple<int8_t, string>(10, "r"), 1);
+ // PK >= (0, "m", "")
+ // PK < (10, "r", "")
+ Check(make_tuple<int8_t, string, string>(0, "m", ""),
+ make_tuple<int8_t, string, string>(10, "r", ""), 1);
- // PK >= (0, "")
- // PK < (10, "m")
- Check(make_tuple<int8_t, string>(0, ""),
- make_tuple<int8_t, string>(10, "m"), 2);
+ // PK >= (0, "m", "")
+ // PK < (10, "r", "z")
+ Check(make_tuple<int8_t, string, string>(0, "m", ""),
+ make_tuple<int8_t, string, string>(10, "r", "z"), 2);
- // PK >= (10, "m")
- // PK < (10, "m")
- Check(make_tuple<int8_t, string>(10, "m"),
- make_tuple<int8_t, string>(10, "m"), 1);
+ // PK >= (0, "", "")
+ // PK < (10, "m", "z")
+ Check(make_tuple<int8_t, string, string>(0, "", ""),
+ make_tuple<int8_t, string, string>(10, "m", "z"), 2);
+
+ // PK >= (10, "m", "")
+ // PK < (10, "m", "z")
+ Check(make_tuple<int8_t, string, string>(10, "m", ""),
+ make_tuple<int8_t, string, string>(10, "m", "z"), 1);
+}
+
+TEST_F(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning) {
+ // CREATE TABLE t
+ // (a INT8, b INT8, c INT8, PRIMARY KEY (a, b, c))
+ // DISTRIBUTE BY RANGE(a, b)
+ // SPLIT ROWS [(0, 0)];
+
+ // Setup the Schema
+ Schema schema({ ColumnSchema("a", INT8),
+ ColumnSchema("b", INT8),
+ ColumnSchema("c", INT8) },
+ { ColumnId(0), ColumnId(1), ColumnId(2) },
+ 3);
+
+ PartitionSchema partition_schema;
+ auto pb = PartitionSchemaPB();
+ auto range_schema = pb.mutable_range_schema();
+ range_schema->add_columns()->set_name("a");
+ range_schema->add_columns()->set_name("b");
+ ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+ KuduPartialRow split(&schema);
+ ASSERT_OK(split.SetInt8("a", 0));
+ ASSERT_OK(split.SetInt8("b", 0));
+
+ vector<Partition> partitions;
+ ASSERT_OK(partition_schema.CreatePartitions({ split }, {}, schema, &partitions));
+
+ // Applies the specified lower and upper bound primary keys against the
+ // schema, and checks that the expected number of partitions are pruned.
+ auto Check = [&] (optional<tuple<int8_t, int8_t, int8_t>> lower,
+ optional<tuple<int8_t, int8_t, int8_t>> upper,
+ size_t remaining_tablets ) {
+ ScanSpec spec;
+ KuduPartialRow lower_bound(&schema);
+ KuduPartialRow upper_bound(&schema);
+ gscoped_ptr<EncodedKey> enc_lower_bound;
+ gscoped_ptr<EncodedKey> enc_upper_bound;
+
+ if (lower) {
+ CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
+ CHECK_OK(lower_bound.SetInt8("b", get<1>(*lower)));
+ CHECK_OK(lower_bound.SetInt8("c", get<2>(*lower)));
+ ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
+ enc_lower_bound = EncodedKey::FromContiguousRow(row);
+ spec.SetLowerBoundKey(enc_lower_bound.get());
+ }
+ if (upper) {
+ CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
+ CHECK_OK(upper_bound.SetInt8("b", get<1>(*upper)));
+ CHECK_OK(upper_bound.SetInt8("c", get<2>(*upper)));
+ ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
+ enc_upper_bound = EncodedKey::FromContiguousRow(row);
+ spec.SetExclusiveUpperBoundKey(enc_upper_bound.get());
+ }
+ size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
+ CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+ remaining_tablets, pruner_ranges);
+ };
+
+ // No bounds
+ Check(boost::none, boost::none, 2);
+
+ // PK < (0, 0, min)
+ Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(0, INT8_MIN, INT8_MIN), 1);
+
+ // PK < (0, 0, 0);
+ Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(0, 0, 0), 2);
+
+ // PK < (0, max, 0);
+ Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, 0), 2);
+
+ // PK < (max, max, min);
+ Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, INT8_MIN), 2);
+
+ // PK < (max, max, 0);
+ Check(boost::none, make_tuple<int8_t, int8_t, int8_t>(INT8_MAX, INT8_MAX, 0), 2);
+
+ // PK >= (0, 0, 0);
+ Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0), boost::none, 1);
+
+ // PK >= (0, 0, -1);
+ Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, -1), boost::none, 1);
+
+ // PK >= (0, 0, min);
+ Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, INT8_MIN), boost::none, 1);
}
TEST_F(PartitionPrunerTest, TestRangePruning) {
@@ -599,6 +702,7 @@ TEST_F(PartitionPrunerTest, TestInListHashPruning) {
hash_component_3->add_columns()->set_name("c");
hash_component_3->set_num_buckets(3);
hash_component_3->set_seed(0);
+ pb.mutable_range_schema()->clear_columns();
ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
@@ -686,6 +790,7 @@ TEST_F(PartitionPrunerTest, TestMultiColumnInListHashPruning) {
hash_component_2->add_columns()->set_name("c");
hash_component_2->set_num_buckets(3);
hash_component_2->set_seed(0);
+ pb.mutable_range_schema()->clear_columns();
ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
@@ -886,4 +991,60 @@ TEST_F(PartitionPrunerTest, TestPruning) {
Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
string("\0\0\0\1", 4), "", 1, 1);
}
+
+TEST_F(PartitionPrunerTest, TestKudu2173) {
+ // CREATE TABLE t
+ // (a INT8, b INT8, PRIMARY KEY (a, b))
+ // DISTRIBUTE BY RANGE(a)
+ // SPLIT ROWS [(10)]
+
+ // Setup the Schema
+ Schema schema({ ColumnSchema("a", INT8),
+ ColumnSchema("b", INT8)},
+ { ColumnId(0), ColumnId(1) },
+ 2);
+
+ PartitionSchema partition_schema;
+ auto pb = PartitionSchemaPB();
+ auto range_schema = pb.mutable_range_schema();
+ range_schema->add_columns()->set_name("a");
+ ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+ KuduPartialRow split1(&schema);
+ ASSERT_OK(split1.SetInt8("a", 10));
+ vector<Partition> partitions;
+ ASSERT_OK(partition_schema.CreatePartitions({ split1 }, {}, schema, &partitions));
+
+ // Applies the specified predicates to a scan and checks that the expected
+ // number of partitions are pruned.
+ auto Check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
+ ScanSpec spec;
+
+ for (const auto& pred : predicates) {
+ spec.AddPredicate(pred);
+ }
+ size_t pruner_ranges = remaining_tablets == 0 ? 0 : 1;
+ CheckPrunedPartitions(schema, partition_schema, partitions, spec,
+ remaining_tablets, pruner_ranges);
+ };
+
+ int8_t eleven = 11;
+ int8_t max = INT8_MAX;
+
+ // a < 11
+ Check({ ColumnPredicate::Range(schema.column(0), nullptr, &eleven) }, 2);
+
+ // a < 11 AND b < 11
+ Check({ ColumnPredicate::Range(schema.column(0), nullptr, &eleven),
+ ColumnPredicate::Range(schema.column(1), nullptr, &eleven) },
+ 2);
+
+ // a < max
+ Check({ ColumnPredicate::Range(schema.column(0), nullptr, &max) }, 2);
+
+ // a < max AND b < 11
+ Check({ ColumnPredicate::Range(schema.column(0), nullptr, &max),
+ ColumnPredicate::Range(schema.column(1), nullptr, &eleven) },
+ 2);
+}
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/0c823987/src/kudu/common/partition_pruner.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner.cc b/src/kudu/common/partition_pruner.cc
index a05c927..e3bbebf 100644
--- a/src/kudu/common/partition_pruner.cc
+++ b/src/kudu/common/partition_pruner.cc
@@ -99,8 +99,9 @@ void EncodeRangeKeysFromPrimaryKeyBounds(const Schema& schema,
*range_key_end = scan_spec.exclusive_upper_bound_key()->encoded_key().ToString();
}
} else {
- // The range columns are a prefix of the primary key columns. Copy
- // the column values over to a row, and then encode the row as a range key.
+ // The range-partition key columns are a prefix of the primary key columns.
+ // Copy the column values over to a row, and then encode the row as a range
+ // key.
vector<int32_t> col_idxs(num_range_columns);
iota(col_idxs.begin(), col_idxs.end(), 0);
@@ -123,6 +124,28 @@ void EncodeRangeKeysFromPrimaryKeyBounds(const Schema& schema,
scan_spec.exclusive_upper_bound_key()->raw_keys()[idx],
schema.column(idx).type_info()->size());
}
+
+ // Determine if the upper bound primary key columns which aren't in the
+ // range-partition key are all set to the minimum value. If so, the
+ // range-partition key prefix of the primary key is already effectively an
+ // exclusive bound. If not, then we increment the range-key prefix in
+ // order to transform it from inclusive to exclusive.
+ bool min_suffix = true;
+ for (int32_t idx = num_range_columns; idx < schema.num_key_columns(); idx++) {
+ min_suffix &= schema.column(idx)
+ .type_info()
+ ->IsMinValue(scan_spec.exclusive_upper_bound_key()->raw_keys()[idx]);
+ }
+ Arena arena(std::max<size_t>(Arena::kMinimumChunkSize, schema.key_byte_size()), 4096);
+ if (!min_suffix) {
+ if (!key_util::IncrementPrimaryKey(&row, num_range_columns, &arena)) {
+ // The range-partition key upper bound can't be incremented, which
+ // means it's an inclusive bound on the maximum possible value, so
+ // skip it.
+ return;
+ }
+ }
+
key_util::EncodeKey(col_idxs, row, range_key_end);
}
}
[2/2] kudu git commit: python: replace bespoke minicluster
implementation with control shell
Posted by ad...@apache.org.
python: replace bespoke minicluster implementation with control shell
In this case we're using JSON serialization to talk to the shell because the
Python bindings don't already make use of protobuf.
I had to update test_scantoken.py to shut down its multiprocessing pool
after using it, otherwise its subprocesses inherited a copy of the control
shell stdin pipe writer. This led to hangs on stop_cluster() because closing
the control shell's stdin didn't actually close the last stdin writer, thus
the control shell didn't exit.
Change-Id: I821e864cfe738a4d39ae039b95ca38f16cdcfb82
Reviewed-on: http://gerrit.cloudera.org:8080/8236
Tested-by: Kudu Jenkins
Reviewed-by: Wes McKinney <we...@apache.org>
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1d928951
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1d928951
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1d928951
Branch: refs/heads/master
Commit: 1d928951e375c0cdb6c3c151c5bacb3779805f5f
Parents: 0c82398
Author: Adar Dembo <ad...@cloudera.com>
Authored: Sun Oct 8 18:07:05 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon Oct 9 23:52:19 2017 +0000
----------------------------------------------------------------------
python/kudu/tests/common.py | 140 ++++++++++---------------------
python/kudu/tests/test_scantoken.py | 4 +-
2 files changed, 45 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/1d928951/python/kudu/tests/common.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/common.py b/python/kudu/tests/common.py
index c6b1b7d..9596eaa 100644
--- a/python/kudu/tests/common.py
+++ b/python/kudu/tests/common.py
@@ -19,13 +19,8 @@
from __future__ import division
import json
-import fnmatch
import os
-import shutil
import subprocess
-import tempfile
-import time
-import socket
import kudu
from kudu.client import Partitioning
@@ -42,14 +37,22 @@ class KuduTestBase(object):
tablet servers.
"""
- BASE_PORT = 37000
- NUM_TABLET_SERVERS = 3
- TSERVER_START_TIMEOUT_SECS = 10
NUM_MASTER_SERVERS = 3
+ NUM_TABLET_SERVERS = 3
+
+ @classmethod
+ def send_and_receive(cls, proc, request):
+ binary_req = (json.dumps(request) + "\n").encode("utf-8")
+ proc.stdin.write(binary_req)
+ proc.stdin.flush()
+ binary_resp = proc.stdout.readline()
+ response = json.loads(binary_resp[:-1].decode("utf-8"))
+ if "error" in response:
+ raise Exception("Error in response: {0}".format(response["error"]))
+ return response
@classmethod
def start_cluster(cls):
- local_path = tempfile.mkdtemp(dir=os.getenv("TEST_TMPDIR"))
kudu_build = os.getenv("KUDU_BUILD")
if not kudu_build:
kudu_build = os.path.join(os.getenv("KUDU_HOME"), "build", "latest")
@@ -58,101 +61,42 @@ class KuduTestBase(object):
master_hosts = []
master_ports = []
- # We need to get the port numbers for the masters before starting them
- # so that we can appropriately configure a multi-master.
- for m in range(cls.NUM_MASTER_SERVERS):
- master_hosts.append('127.0.0.1')
- # This introduces a race
- s = socket.socket()
- s.bind(('', 0))
- master_ports.append(s.getsockname()[1])
- s.close()
-
- multi_master_string = ','.join('{0}:{1}'.format(host, port)
- for host, port
- in zip(master_hosts, master_ports))
-
- for m in range(cls.NUM_MASTER_SERVERS):
- os.makedirs("{0}/master/{1}".format(local_path, m))
- os.makedirs("{0}/master/{1}/data".format(local_path, m))
- os.makedirs("{0}/master/{1}/logs".format(local_path, m))
-
-
- path = [
- "{0}/kudu-master".format(bin_path),
- "-unlock_unsafe_flags",
- "-unlock_experimental_flags",
- "-rpc_server_allow_ephemeral_ports",
- "-rpc_bind_addresses=0.0.0.0:{0}".format(master_ports[m]),
- "-fs_wal_dir={0}/master/{1}/data".format(local_path, m),
- "-fs_data_dirs={0}/master/{1}/data".format(local_path, m),
- "-log_dir={0}/master/{1}/logs".format(local_path, m),
- "-logtostderr",
- "-webserver_port=0",
- "-master_addresses={0}".format(multi_master_string),
- # Only make one replica so that our tests don't need to worry about
- # setting consistency modes.
- "-default_num_replicas=1"
- ]
-
- p = subprocess.Popen(path, shell=False)
- fid = open("{0}/master/{1}/kudu-master.pid".format(local_path, m), "w+")
- fid.write("{0}".format(p.pid))
- fid.close()
-
- for m in range(cls.NUM_TABLET_SERVERS):
- os.makedirs("{0}/ts/{1}".format(local_path, m))
- os.makedirs("{0}/ts/{1}/logs".format(local_path, m))
-
- path = [
- "{0}/kudu-tserver".format(bin_path),
- "-unlock_unsafe_flags",
- "-unlock_experimental_flags",
- "-rpc_server_allow_ephemeral_ports",
- "-rpc_bind_addresses=0.0.0.0:0",
- "-tserver_master_addrs={0}".format(multi_master_string),
- "-webserver_port=0",
- "-log_dir={0}/ts/{1}/logs".format(local_path, m),
- "-logtostderr",
- "-fs_data_dirs={0}/ts/{1}/data".format(local_path, m),
- "-fs_wal_dir={0}/ts/{1}/data".format(local_path, m),
- ]
- p = subprocess.Popen(path, shell=False)
- tserver_pid = "{0}/ts/{1}/kudu-tserver.pid".format(local_path, m)
- fid = open(tserver_pid, "w+")
- fid.write("{0}".format(p.pid))
- fid.close()
-
- return local_path, master_hosts, master_ports
+ # Start the mini-cluster control process.
+ args = ["{0}/kudu".format(bin_path), "test", "mini_cluster"]
+ p = subprocess.Popen(args, shell=False,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+
+ # Create and start a cluster.
+ #
+ # Only make one replica so that our tests don't need to worry about
+ # setting consistency modes.
+ cls.send_and_receive(
+ p, { "create_cluster" :
+ { "numMasters" : cls.NUM_MASTER_SERVERS,
+ "numTservers" : cls.NUM_TABLET_SERVERS,
+ "extraMasterFlags" : [ "--default_num_replicas=1" ]}})
+ cls.send_and_receive(p, { "start_cluster" : {}})
+
+ # Get information about the cluster's masters.
+ masters = cls.send_and_receive(p, { "get_masters" : {}})
+ for m in masters["getMasters"]["masters"]:
+ master_hosts.append(m["boundRpcAddress"]["host"])
+ master_ports.append(m["boundRpcAddress"]["port"])
+
+ return p, master_hosts, master_ports
@classmethod
- def stop_cluster(cls, path):
- for root, dirnames, filenames in os.walk('{0}/..'.format(path)):
- for filename in fnmatch.filter(filenames, '*.pid'):
- with open(os.path.join(root, filename)) as fid:
- a = fid.read()
- r = subprocess.Popen(["kill", "{0}".format(a)])
- r.wait()
- os.remove(os.path.join(root, filename))
- shutil.rmtree(path, True)
+ def stop_cluster(cls):
+ cls.cluster_proc.stdin.close()
+ ret = cls.cluster_proc.wait()
+ if ret != 0:
+ raise Exception("Minicluster process exited with code {0}".format(ret))
@classmethod
def setUpClass(cls):
- cls.cluster_path, cls.master_hosts, cls.master_ports = cls.start_cluster()
- time.sleep(1)
-
+ cls.cluster_proc, cls.master_hosts, cls.master_ports = cls.start_cluster()
cls.client = kudu.connect(cls.master_hosts, cls.master_ports)
- # Wait for all tablet servers to start with the configured timeout
- timeout = time.time() + cls.TSERVER_START_TIMEOUT_SECS
- while len(cls.client.list_tablet_servers()) < cls.NUM_TABLET_SERVERS:
- if time.time() > timeout:
- raise TimeoutError(
- "Tablet servers took too long to start. Timeout set to {}"
- .format(cls.TSERVER_START_TIMEOUT_SECS))
- # Sleep 50 milliseconds to avoid tight-looping rpc
- time.sleep(0.05)
-
cls.schema = cls.example_schema()
cls.partitioning = cls.example_partitioning()
@@ -163,7 +107,7 @@ class KuduTestBase(object):
@classmethod
def tearDownClass(cls):
- cls.stop_cluster(cls.cluster_path)
+ cls.stop_cluster()
@classmethod
def example_schema(cls):
http://git-wip-us.apache.org/repos/asf/kudu/blob/1d928951/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py
index 392e2a9..c675a92 100644
--- a/python/kudu/tests/test_scantoken.py
+++ b/python/kudu/tests/test_scantoken.py
@@ -53,8 +53,10 @@ class TestScanToken(TestScanBase):
# Begin process pool
pool = Pool(len(input))
results = pool.map(_get_scan_token_results, input)
+ pool.close()
+ pool.join()
- #Validate results
+ # Validate results
actual_tuples = []
for result in results:
actual_tuples += result