You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2016/08/16 23:41:19 UTC

[2/2] kudu git commit: [c++-client] fix KuduScanTokenBuilder token generation bugs

[c++-client] fix KuduScanTokenBuilder token generation bugs

This commit fixes two critical issues in the KuduScanTokenBuilder
implementation:

  1. Column predicates are now correctly carried through to the scan token.
     Prior testing didn't catch this because the predicates were being
     transformed into PK bounds, which have always worked correctly. This is
     only an issue on the serialization side, so it doesn't affect scan tokens
     generated by the Java client and deserialized by the C++ client.
  2. Token building now works on tables with non-covering range partitioned
     tables. The fix is mostly copy/paste from scanner-internal, which is
     very similar.

flex_partitioning-itest has been extended to check scan tokens, and the scan
token unit tests have been updated. I also added a unit test for issue 1 on the
Java side to add some confidence that the Java side is not affected.

Change-Id: Iff3ec3e2399b191c71595c96212471b1e21c7446
Reviewed-on: http://gerrit.cloudera.org:8080/4007
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/854b7156
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/854b7156
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/854b7156

Branch: refs/heads/master
Commit: 854b71563dee5791a3a593947fe1b7233b59cba1
Parents: 5239d43
Author: Dan Burkert <da...@cloudera.com>
Authored: Tue Aug 16 11:45:25 2016 -0700
Committer: Dan Burkert <da...@cloudera.com>
Committed: Tue Aug 16 23:40:39 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/TestKuduClient.java  | 124 ++++++++++++++-----
 src/kudu/client/scan_token-internal.cc          |  26 +++-
 src/kudu/client/scan_token-test.cc              | 110 +++++++++++++++-
 .../flex_partitioning-itest.cc                  |  42 ++++++-
 4 files changed, 265 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/854b7156/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index e16b657..b65a81b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -336,6 +336,45 @@ public class TestKuduClient extends BaseKuduTest {
   }
 
   /**
+   * Counts the rows in the provided scan tokens.
+   */
+  private int countScanTokenRows(List<KuduScanToken> tokens) throws Exception {
+    final AtomicInteger count = new AtomicInteger(0);
+    List<Thread> threads = new ArrayList<>();
+    for (final KuduScanToken token : tokens) {
+      final byte[] serializedToken = token.serialize();
+      Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try (KuduClient contextClient = new KuduClient.KuduClientBuilder(masterAddresses)
+              .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+              .build()) {
+            KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, contextClient);
+            try {
+              int localCount = 0;
+              while (scanner.hasMoreRows()) {
+                localCount += Iterators.size(scanner.nextRows());
+              }
+              count.addAndGet(localCount);
+            } finally {
+              scanner.close();
+            }
+          } catch (Exception e) {
+            LOG.error("exception in parallel token scanner", e);
+          }
+        }
+      });
+      thread.run();
+      threads.add(thread);
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    return count.get();
+  }
+
+  /**
    * Tests scan tokens by creating a set of scan tokens, serializing them, and
    * then executing them in parallel with separate client instances. This
    * simulates the normal usecase of scan tokens being created at a central
@@ -370,41 +409,62 @@ public class TestKuduClient extends BaseKuduTest {
     tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
     List<KuduScanToken> tokens = tokenBuilder.build();
     assertEquals(16, tokens.size());
+    assertEquals(100, countScanTokenRows(tokens));
+  }
 
-    final AtomicInteger count = new AtomicInteger(0);
-    List<Thread> threads = new ArrayList<>();
-    for (final KuduScanToken token : tokens) {
-      final byte[] serializedToken = token.serialize();
-      Thread thread = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          try (KuduClient contextClient = new KuduClient.KuduClientBuilder(masterAddresses)
-                                                  .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
-                                                  .build()) {
-            KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, contextClient);
-            try {
-              int localCount = 0;
-              while (scanner.hasMoreRows()) {
-                localCount += Iterators.size(scanner.nextRows());
-              }
-              assertTrue(localCount > 0);
-              count.addAndGet(localCount);
-            } finally {
-              scanner.close();
-            }
-          } catch (Exception e) {
-            LOG.error("exception in parallel token scanner", e);
-          }
-        }
-      });
-      thread.run();
-      threads.add(thread);
-    }
+  /**
+   * Tests scan token creation and execution on a table with non-covering range partitions.
+   */
+  @Test
+  public void testScanTokensNonCoveringRangePartitions() throws Exception {
+    Schema schema = createManyStringsSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.addHashPartitions(ImmutableList.of("key"), 2);
 
-    for (Thread thread : threads) {
-      thread.join();
+    PartialRow lower = schema.newPartialRow();
+    PartialRow upper = schema.newPartialRow();
+    lower.addString("key", "a");
+    upper.addString("key", "f");
+    createOptions.addRangePartition(lower, upper);
+
+    lower = schema.newPartialRow();
+    upper = schema.newPartialRow();
+    lower.addString("key", "h");
+    upper.addString("key", "z");
+    createOptions.addRangePartition(lower, upper);
+
+    PartialRow split = schema.newPartialRow();
+    split.addString("key", "k");
+    createOptions.addSplitRow(split);
+
+    syncClient.createTable(tableName, schema, createOptions);
+
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+    KuduTable table = syncClient.openTable(tableName);
+    for (char c = 'a'; c < 'f'; c++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString("key", "" + c);
+      row.addString("c1", "c1_" + c);
+      row.addString("c2", "c2_" + c);
+      session.apply(insert);
+    }
+    for (char c = 'h'; c < 'z'; c++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString("key", "" + c);
+      row.addString("c1", "c1_" + c);
+      row.addString("c2", "c2_" + c);
+      session.apply(insert);
     }
-    assertEquals(100, count.get());
+    session.flush();
+
+    KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table);
+    tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
+    List<KuduScanToken> tokens = tokenBuilder.build();
+    assertEquals(6, tokens.size());
+    assertEquals('f' - 'a' + 'z' - 'h', countScanTokenRows(tokens));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/854b7156/src/kudu/client/scan_token-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 2da971b..43a56da 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -192,6 +192,10 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
     pb.clear_upper_bound_primary_key();
   }
 
+  for (const auto& predicate_pair : configuration_.spec().predicates()) {
+    ColumnPredicateToPB(predicate_pair.second, pb.add_column_predicates());
+  }
+
   switch (configuration_.read_mode()) {
     case KuduScanner::READ_LATEST: pb.set_read_mode(kudu::READ_LATEST); break;
     case KuduScanner::READ_AT_SNAPSHOT: pb.set_read_mode(kudu::READ_AT_SNAPSHOT); break;
@@ -218,13 +222,29 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
   while (pruner.HasMorePartitionKeyRanges()) {
     scoped_refptr<internal::RemoteTablet> tablet;
     Synchronizer sync;
+    const string& partition_key = pruner.NextPartitionKey();
     client->data_->meta_cache_->LookupTabletByKeyOrNext(table,
-                                                        pruner.NextPartitionKey(),
+                                                        partition_key,
                                                         deadline,
                                                         &tablet,
                                                         sync.AsStatusCallback());
-    RETURN_NOT_OK(sync.Wait());
-    CHECK(tablet);
+    Status s = sync.Wait();
+    if (s.IsNotFound()) {
+      // No more tablets in the table.
+      pruner.RemovePartitionKeyRange("");
+      continue;
+    } else {
+      RETURN_NOT_OK(s);
+    }
+
+    // Check if the meta cache returned a tablet covering a partition key range past
+    // what we asked for. This can happen if the requested partition key falls
+    // in a non-covered range. In this case we can potentially prune the tablet.
+    if (partition_key < tablet->partition().partition_key_start() &&
+        pruner.ShouldPrune(tablet->partition())) {
+      pruner.RemovePartitionKeyRange(tablet->partition().partition_key_end());
+      continue;
+    }
 
     vector<internal::RemoteTabletServer*> remote_tablet_servers;
     tablet->GetRemoteTabletServers(&remote_tablet_servers);

http://git-wip-us.apache.org/repos/asf/kudu/blob/854b7156/src/kudu/client/scan_token-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc
index f425a71..05bb93d 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
 #include <memory>
 #include <string>
 #include <thread>
 #include <vector>
-#include <atomic>
 
 #include "kudu/client/client.h"
 #include "kudu/gutil/stl_util.h"
@@ -194,5 +194,113 @@ TEST_F(ScanTokenTest, TestScanTokens) {
   }
 }
 
+TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) {
+  // Create schema
+  KuduSchema schema;
+  {
+    KuduSchemaBuilder builder;
+    builder.AddColumn("col")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
+    ASSERT_OK(builder.Build(&schema));
+  }
+
+  // Create table
+  shared_ptr<KuduTable> table;
+  {
+    unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+    unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+    unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
+    table_creator->table_name("table");
+    table_creator->num_replicas(1);
+    table_creator->schema(&schema);
+
+    ASSERT_OK(lower_bound->SetInt64("col", 0));
+    ASSERT_OK(upper_bound->SetInt64("col", 100));
+    table_creator->add_range_partition(lower_bound.release(), upper_bound.release());
+
+    lower_bound.reset(schema.NewRow());
+    upper_bound.reset(schema.NewRow());
+    ASSERT_OK(lower_bound->SetInt64("col", 200));
+    ASSERT_OK(upper_bound->SetInt64("col", 400));
+    table_creator->add_range_partition(lower_bound.release(), upper_bound.release());
+
+    unique_ptr<KuduPartialRow> split(schema.NewRow());
+    ASSERT_OK(split->SetInt64("col", 300));
+    table_creator->add_range_partition_split(split.release());
+    table_creator->add_hash_partitions({ "col" }, 2);
+
+    ASSERT_OK(table_creator->Create());
+    ASSERT_OK(client_->OpenTable("table", &table));
+  }
+
+  // Create session
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(10000);
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+  // Insert rows
+  for (int i = 0; i < 100; i++) {
+      unique_ptr<KuduInsert> insert(table->NewInsert());
+      ASSERT_OK(insert->mutable_row()->SetInt64("col", i));
+      ASSERT_OK(session->Apply(insert.release()));
+  }
+  for (int i = 200; i < 400; i++) {
+      unique_ptr<KuduInsert> insert(table->NewInsert());
+      ASSERT_OK(insert->mutable_row()->SetInt64("col", i));
+      ASSERT_OK(session->Apply(insert.release()));
+  }
+  ASSERT_OK(session->Flush());
+
+  { // no predicates
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
+
+    ASSERT_EQ(6, tokens.size());
+    ASSERT_EQ(300, CountRows(tokens));
+  }
+
+  { // range predicate
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate("col",
+                                                                      KuduPredicate::GREATER_EQUAL,
+                                                                      KuduValue::FromInt(200)));
+    ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(4, tokens.size());
+    ASSERT_EQ(200, CountRows(tokens));
+  }
+
+  { // equality predicate
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate("col",
+                                                                      KuduPredicate::EQUAL,
+                                                                      KuduValue::FromInt(42)));
+    ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(1, tokens.size());
+    ASSERT_EQ(1, CountRows(std::move(tokens)));
+  }
+
+  { // primary key bound
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+    ASSERT_OK(upper_bound->SetInt64("col", 40));
+
+    ASSERT_OK(builder.AddUpperBound(*upper_bound));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(2, tokens.size());
+    ASSERT_EQ(40, CountRows(tokens));
+  }
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/854b7156/src/kudu/integration-tests/flex_partitioning-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/flex_partitioning-itest.cc b/src/kudu/integration-tests/flex_partitioning-itest.cc
index 35c38ec..2fe9b50 100644
--- a/src/kudu/integration-tests/flex_partitioning-itest.cc
+++ b/src/kudu/integration-tests/flex_partitioning-itest.cc
@@ -43,6 +43,8 @@ using kudu::client::KuduColumnSchema;
 using kudu::client::KuduInsert;
 using kudu::client::KuduPredicate;
 using kudu::client::KuduScanner;
+using kudu::client::KuduScanToken;
+using kudu::client::KuduScanTokenBuilder;
 using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaBuilder;
 using kudu::client::KuduSession;
@@ -265,6 +267,15 @@ class FlexPartitioningITest : public KuduTest {
   // in-memory copy 'inserted_rows_'.
   void CheckScanWithColumnPredicate(Slice col_name, int lower, int upper);
 
+  // Perform a scan via the Scan Token API with a predicate on 'col_name'
+  // BETWEEN 'lower' AND 'upper'. Verifies that the results match up with
+  // 'expected_rows'. Called by CheckScanWithColumnPredicates as an additional
+  // check.
+  void CheckScanTokensWithColumnPredicate(Slice col_name,
+                                          int lower,
+                                          int upper,
+                                          const vector<string>& expected_rows);
+
   // Like the above, but uses the primary key range scan API in the client to
   // scan between 'inserted_rows_[lower]' (inclusive) and 'inserted_rows_[upper]'
   // (exclusive).
@@ -323,7 +334,7 @@ Status FlexPartitioningITest::InsertRows(const RangePartitionOptions& range_part
 
 void FlexPartitioningITest::CheckScanWithColumnPredicate(Slice col_name, int lower, int upper) {
   KuduScanner scanner(table_.get());
-  scanner.SetTimeoutMillis(60000);
+  CHECK_OK(scanner.SetTimeoutMillis(60000));
   CHECK_OK(scanner.AddConjunctPredicate(table_->NewComparisonPredicate(
       col_name, KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(lower))));
   CHECK_OK(scanner.AddConjunctPredicate(table_->NewComparisonPredicate(
@@ -346,6 +357,35 @@ void FlexPartitioningITest::CheckScanWithColumnPredicate(Slice col_name, int low
 
   ASSERT_EQ(expected_rows.size(), rows.size());
   ASSERT_EQ(expected_rows, rows);
+
+  NO_FATALS(CheckScanTokensWithColumnPredicate(col_name, lower, upper, expected_rows));
+}
+
+void FlexPartitioningITest::CheckScanTokensWithColumnPredicate(
+    Slice col_name, int lower, int upper, const vector<string>& expected_rows) {
+  KuduScanTokenBuilder builder(table_.get());
+  CHECK_OK(builder.SetTimeoutMillis(60000));
+
+  CHECK_OK(builder.AddConjunctPredicate(table_->NewComparisonPredicate(
+      col_name, KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(lower))));
+  CHECK_OK(builder.AddConjunctPredicate(table_->NewComparisonPredicate(
+      col_name, KuduPredicate::LESS_EQUAL, KuduValue::FromInt(upper))));
+
+  vector<KuduScanToken*> tokens;
+  ElementDeleter DeleteTable(&tokens);
+  CHECK_OK(builder.Build(&tokens));
+
+  vector<string> rows;
+  for (auto token : tokens) {
+    KuduScanner* scanner_ptr;
+    CHECK_OK(token->IntoKuduScanner(&scanner_ptr));
+    unique_ptr<KuduScanner> scanner(scanner_ptr);
+    ScanToStrings(scanner.get(), &rows);
+  }
+  std::sort(rows.begin(), rows.end());
+
+  ASSERT_EQ(expected_rows.size(), rows.size());
+  ASSERT_EQ(expected_rows, rows);
 }
 
 void FlexPartitioningITest::CheckPKRangeScan(int lower, int upper) {