You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2016/08/16 23:41:18 UTC

[1/2] kudu git commit: Release note for KUDU-1444 incorrectly refers to itself as KUDU-1516

Repository: kudu
Updated Branches:
  refs/heads/master a5a192a48 -> 854b71563


Release note for KUDU-1444 incorrectly refers to itself as KUDU-1516

Change-Id: I4ded9fdf7ed4d732138c3d545f457798684ac94e
Reviewed-on: http://gerrit.cloudera.org:8080/4009
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Misty Stanley-Jones <mi...@apache.org>
Tested-by: Misty Stanley-Jones <mi...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5239d43c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5239d43c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5239d43c

Branch: refs/heads/master
Commit: 5239d43cf84dfa84e0d123137e258697c8aea63c
Parents: a5a192a
Author: Misty Stanley-Jones <mi...@apache.org>
Authored: Tue Aug 16 13:53:38 2016 -0700
Committer: Misty Stanley-Jones <mi...@apache.org>
Committed: Tue Aug 16 21:38:28 2016 +0000

----------------------------------------------------------------------
 docs/release_notes.adoc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5239d43c/docs/release_notes.adoc
----------------------------------------------------------------------
diff --git a/docs/release_notes.adoc b/docs/release_notes.adoc
index 53164c4..0c6b404 100644
--- a/docs/release_notes.adoc
+++ b/docs/release_notes.adoc
@@ -178,7 +178,7 @@ To upgrade to Kudu 0.10.0, see link:installation.html#upgrade[Upgrade from 0.9.x
 - link:https://gerrit.cloudera.org/#/c/3674/[Gerrit #3674] Added LESS and GREATER options for
   column predicates.
 
-- link:https://issues.apache.org/jira/browse/KUDU-1516[KUDU-1516] added support for passing
+- link:https://issues.apache.org/jira/browse/KUDU-1444[KUDU-1444] added support for passing
   back basic per-scan metrics (e.g cache hit rate) from the server to the C++ client. See the
   `KuduScanner::GetResourceMetrics()` API for detailed usage. This feature will be supported
   in the Java client API in a future release.


[2/2] kudu git commit: [c++-client] fix KuduScanTokenBuilder token generation bugs

Posted by da...@apache.org.
[c++-client] fix KuduScanTokenBuilder token generation bugs

This commit fixes two critical issues in the KuduScanTokenBuilder
implementation:

  1. Column predicates are now correctly carried through to the scan token.
     Prior testing didn't catch this because the predicates were being
     transformed into PK bounds, which have always worked correctly. This is
     only an issue on the serialization side, so it doesn't affect scan tokens
     generated by the Java client and deserialized by the C++ client.
  2. Token building now works on tables with non-covering range partitioned
     tables. The fix is mostly copy/paste from scanner-internal, which is
     very similar.

flex_partitioning-itest has been extended to check scan tokens, and the scan
token unit tests have been updated. I also added a unit test for issue 1 on the
Java side to add some confidence that the Java side is not affected.

Change-Id: Iff3ec3e2399b191c71595c96212471b1e21c7446
Reviewed-on: http://gerrit.cloudera.org:8080/4007
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/854b7156
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/854b7156
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/854b7156

Branch: refs/heads/master
Commit: 854b71563dee5791a3a593947fe1b7233b59cba1
Parents: 5239d43
Author: Dan Burkert <da...@cloudera.com>
Authored: Tue Aug 16 11:45:25 2016 -0700
Committer: Dan Burkert <da...@cloudera.com>
Committed: Tue Aug 16 23:40:39 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/TestKuduClient.java  | 124 ++++++++++++++-----
 src/kudu/client/scan_token-internal.cc          |  26 +++-
 src/kudu/client/scan_token-test.cc              | 110 +++++++++++++++-
 .../flex_partitioning-itest.cc                  |  42 ++++++-
 4 files changed, 265 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/854b7156/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index e16b657..b65a81b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -336,6 +336,45 @@ public class TestKuduClient extends BaseKuduTest {
   }
 
   /**
+   * Counts the rows in the provided scan tokens.
+   */
+  private int countScanTokenRows(List<KuduScanToken> tokens) throws Exception {
+    final AtomicInteger count = new AtomicInteger(0);
+    List<Thread> threads = new ArrayList<>();
+    for (final KuduScanToken token : tokens) {
+      final byte[] serializedToken = token.serialize();
+      Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try (KuduClient contextClient = new KuduClient.KuduClientBuilder(masterAddresses)
+              .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+              .build()) {
+            KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, contextClient);
+            try {
+              int localCount = 0;
+              while (scanner.hasMoreRows()) {
+                localCount += Iterators.size(scanner.nextRows());
+              }
+              count.addAndGet(localCount);
+            } finally {
+              scanner.close();
+            }
+          } catch (Exception e) {
+            LOG.error("exception in parallel token scanner", e);
+          }
+        }
+      });
+      thread.run();
+      threads.add(thread);
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    return count.get();
+  }
+
+  /**
    * Tests scan tokens by creating a set of scan tokens, serializing them, and
    * then executing them in parallel with separate client instances. This
    * simulates the normal usecase of scan tokens being created at a central
@@ -370,41 +409,62 @@ public class TestKuduClient extends BaseKuduTest {
     tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
     List<KuduScanToken> tokens = tokenBuilder.build();
     assertEquals(16, tokens.size());
+    assertEquals(100, countScanTokenRows(tokens));
+  }
 
-    final AtomicInteger count = new AtomicInteger(0);
-    List<Thread> threads = new ArrayList<>();
-    for (final KuduScanToken token : tokens) {
-      final byte[] serializedToken = token.serialize();
-      Thread thread = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          try (KuduClient contextClient = new KuduClient.KuduClientBuilder(masterAddresses)
-                                                  .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
-                                                  .build()) {
-            KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, contextClient);
-            try {
-              int localCount = 0;
-              while (scanner.hasMoreRows()) {
-                localCount += Iterators.size(scanner.nextRows());
-              }
-              assertTrue(localCount > 0);
-              count.addAndGet(localCount);
-            } finally {
-              scanner.close();
-            }
-          } catch (Exception e) {
-            LOG.error("exception in parallel token scanner", e);
-          }
-        }
-      });
-      thread.run();
-      threads.add(thread);
-    }
+  /**
+   * Tests scan token creation and execution on a table with non-covering range partitions.
+   */
+  @Test
+  public void testScanTokensNonCoveringRangePartitions() throws Exception {
+    Schema schema = createManyStringsSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.addHashPartitions(ImmutableList.of("key"), 2);
 
-    for (Thread thread : threads) {
-      thread.join();
+    PartialRow lower = schema.newPartialRow();
+    PartialRow upper = schema.newPartialRow();
+    lower.addString("key", "a");
+    upper.addString("key", "f");
+    createOptions.addRangePartition(lower, upper);
+
+    lower = schema.newPartialRow();
+    upper = schema.newPartialRow();
+    lower.addString("key", "h");
+    upper.addString("key", "z");
+    createOptions.addRangePartition(lower, upper);
+
+    PartialRow split = schema.newPartialRow();
+    split.addString("key", "k");
+    createOptions.addSplitRow(split);
+
+    syncClient.createTable(tableName, schema, createOptions);
+
+    KuduSession session = syncClient.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+    KuduTable table = syncClient.openTable(tableName);
+    for (char c = 'a'; c < 'f'; c++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString("key", "" + c);
+      row.addString("c1", "c1_" + c);
+      row.addString("c2", "c2_" + c);
+      session.apply(insert);
+    }
+    for (char c = 'h'; c < 'z'; c++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString("key", "" + c);
+      row.addString("c1", "c1_" + c);
+      row.addString("c2", "c2_" + c);
+      session.apply(insert);
     }
-    assertEquals(100, count.get());
+    session.flush();
+
+    KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table);
+    tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
+    List<KuduScanToken> tokens = tokenBuilder.build();
+    assertEquals(6, tokens.size());
+    assertEquals('f' - 'a' + 'z' - 'h', countScanTokenRows(tokens));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/854b7156/src/kudu/client/scan_token-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 2da971b..43a56da 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -192,6 +192,10 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
     pb.clear_upper_bound_primary_key();
   }
 
+  for (const auto& predicate_pair : configuration_.spec().predicates()) {
+    ColumnPredicateToPB(predicate_pair.second, pb.add_column_predicates());
+  }
+
   switch (configuration_.read_mode()) {
     case KuduScanner::READ_LATEST: pb.set_read_mode(kudu::READ_LATEST); break;
     case KuduScanner::READ_AT_SNAPSHOT: pb.set_read_mode(kudu::READ_AT_SNAPSHOT); break;
@@ -218,13 +222,29 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
   while (pruner.HasMorePartitionKeyRanges()) {
     scoped_refptr<internal::RemoteTablet> tablet;
     Synchronizer sync;
+    const string& partition_key = pruner.NextPartitionKey();
     client->data_->meta_cache_->LookupTabletByKeyOrNext(table,
-                                                        pruner.NextPartitionKey(),
+                                                        partition_key,
                                                         deadline,
                                                         &tablet,
                                                         sync.AsStatusCallback());
-    RETURN_NOT_OK(sync.Wait());
-    CHECK(tablet);
+    Status s = sync.Wait();
+    if (s.IsNotFound()) {
+      // No more tablets in the table.
+      pruner.RemovePartitionKeyRange("");
+      continue;
+    } else {
+      RETURN_NOT_OK(s);
+    }
+
+    // Check if the meta cache returned a tablet covering a partition key range past
+    // what we asked for. This can happen if the requested partition key falls
+    // in a non-covered range. In this case we can potentially prune the tablet.
+    if (partition_key < tablet->partition().partition_key_start() &&
+        pruner.ShouldPrune(tablet->partition())) {
+      pruner.RemovePartitionKeyRange(tablet->partition().partition_key_end());
+      continue;
+    }
 
     vector<internal::RemoteTabletServer*> remote_tablet_servers;
     tablet->GetRemoteTabletServers(&remote_tablet_servers);

http://git-wip-us.apache.org/repos/asf/kudu/blob/854b7156/src/kudu/client/scan_token-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc
index f425a71..05bb93d 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
 #include <memory>
 #include <string>
 #include <thread>
 #include <vector>
-#include <atomic>
 
 #include "kudu/client/client.h"
 #include "kudu/gutil/stl_util.h"
@@ -194,5 +194,113 @@ TEST_F(ScanTokenTest, TestScanTokens) {
   }
 }
 
+TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) {
+  // Create schema
+  KuduSchema schema;
+  {
+    KuduSchemaBuilder builder;
+    builder.AddColumn("col")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
+    ASSERT_OK(builder.Build(&schema));
+  }
+
+  // Create table
+  shared_ptr<KuduTable> table;
+  {
+    unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
+    unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+    unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
+    table_creator->table_name("table");
+    table_creator->num_replicas(1);
+    table_creator->schema(&schema);
+
+    ASSERT_OK(lower_bound->SetInt64("col", 0));
+    ASSERT_OK(upper_bound->SetInt64("col", 100));
+    table_creator->add_range_partition(lower_bound.release(), upper_bound.release());
+
+    lower_bound.reset(schema.NewRow());
+    upper_bound.reset(schema.NewRow());
+    ASSERT_OK(lower_bound->SetInt64("col", 200));
+    ASSERT_OK(upper_bound->SetInt64("col", 400));
+    table_creator->add_range_partition(lower_bound.release(), upper_bound.release());
+
+    unique_ptr<KuduPartialRow> split(schema.NewRow());
+    ASSERT_OK(split->SetInt64("col", 300));
+    table_creator->add_range_partition_split(split.release());
+    table_creator->add_hash_partitions({ "col" }, 2);
+
+    ASSERT_OK(table_creator->Create());
+    ASSERT_OK(client_->OpenTable("table", &table));
+  }
+
+  // Create session
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(10000);
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+  // Insert rows
+  for (int i = 0; i < 100; i++) {
+      unique_ptr<KuduInsert> insert(table->NewInsert());
+      ASSERT_OK(insert->mutable_row()->SetInt64("col", i));
+      ASSERT_OK(session->Apply(insert.release()));
+  }
+  for (int i = 200; i < 400; i++) {
+      unique_ptr<KuduInsert> insert(table->NewInsert());
+      ASSERT_OK(insert->mutable_row()->SetInt64("col", i));
+      ASSERT_OK(session->Apply(insert.release()));
+  }
+  ASSERT_OK(session->Flush());
+
+  { // no predicates
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
+
+    ASSERT_EQ(6, tokens.size());
+    ASSERT_EQ(300, CountRows(tokens));
+  }
+
+  { // range predicate
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate("col",
+                                                                      KuduPredicate::GREATER_EQUAL,
+                                                                      KuduValue::FromInt(200)));
+    ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(4, tokens.size());
+    ASSERT_EQ(200, CountRows(tokens));
+  }
+
+  { // equality predicate
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate("col",
+                                                                      KuduPredicate::EQUAL,
+                                                                      KuduValue::FromInt(42)));
+    ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(1, tokens.size());
+    ASSERT_EQ(1, CountRows(std::move(tokens)));
+  }
+
+  { // primary key bound
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
+    ASSERT_OK(upper_bound->SetInt64("col", 40));
+
+    ASSERT_OK(builder.AddUpperBound(*upper_bound));
+    ASSERT_OK(builder.Build(&tokens));
+
+    ASSERT_EQ(2, tokens.size());
+    ASSERT_EQ(40, CountRows(tokens));
+  }
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/854b7156/src/kudu/integration-tests/flex_partitioning-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/flex_partitioning-itest.cc b/src/kudu/integration-tests/flex_partitioning-itest.cc
index 35c38ec..2fe9b50 100644
--- a/src/kudu/integration-tests/flex_partitioning-itest.cc
+++ b/src/kudu/integration-tests/flex_partitioning-itest.cc
@@ -43,6 +43,8 @@ using kudu::client::KuduColumnSchema;
 using kudu::client::KuduInsert;
 using kudu::client::KuduPredicate;
 using kudu::client::KuduScanner;
+using kudu::client::KuduScanToken;
+using kudu::client::KuduScanTokenBuilder;
 using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaBuilder;
 using kudu::client::KuduSession;
@@ -265,6 +267,15 @@ class FlexPartitioningITest : public KuduTest {
   // in-memory copy 'inserted_rows_'.
   void CheckScanWithColumnPredicate(Slice col_name, int lower, int upper);
 
+  // Perform a scan via the Scan Token API with a predicate on 'col_name'
+  // BETWEEN 'lower' AND 'upper'. Verifies that the results match up with
+  // 'expected_rows'. Called by CheckScanWithColumnPredicates as an additional
+  // check.
+  void CheckScanTokensWithColumnPredicate(Slice col_name,
+                                          int lower,
+                                          int upper,
+                                          const vector<string>& expected_rows);
+
   // Like the above, but uses the primary key range scan API in the client to
   // scan between 'inserted_rows_[lower]' (inclusive) and 'inserted_rows_[upper]'
   // (exclusive).
@@ -323,7 +334,7 @@ Status FlexPartitioningITest::InsertRows(const RangePartitionOptions& range_part
 
 void FlexPartitioningITest::CheckScanWithColumnPredicate(Slice col_name, int lower, int upper) {
   KuduScanner scanner(table_.get());
-  scanner.SetTimeoutMillis(60000);
+  CHECK_OK(scanner.SetTimeoutMillis(60000));
   CHECK_OK(scanner.AddConjunctPredicate(table_->NewComparisonPredicate(
       col_name, KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(lower))));
   CHECK_OK(scanner.AddConjunctPredicate(table_->NewComparisonPredicate(
@@ -346,6 +357,35 @@ void FlexPartitioningITest::CheckScanWithColumnPredicate(Slice col_name, int low
 
   ASSERT_EQ(expected_rows.size(), rows.size());
   ASSERT_EQ(expected_rows, rows);
+
+  NO_FATALS(CheckScanTokensWithColumnPredicate(col_name, lower, upper, expected_rows));
+}
+
+void FlexPartitioningITest::CheckScanTokensWithColumnPredicate(
+    Slice col_name, int lower, int upper, const vector<string>& expected_rows) {
+  KuduScanTokenBuilder builder(table_.get());
+  CHECK_OK(builder.SetTimeoutMillis(60000));
+
+  CHECK_OK(builder.AddConjunctPredicate(table_->NewComparisonPredicate(
+      col_name, KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(lower))));
+  CHECK_OK(builder.AddConjunctPredicate(table_->NewComparisonPredicate(
+      col_name, KuduPredicate::LESS_EQUAL, KuduValue::FromInt(upper))));
+
+  vector<KuduScanToken*> tokens;
+  ElementDeleter DeleteTable(&tokens);
+  CHECK_OK(builder.Build(&tokens));
+
+  vector<string> rows;
+  for (auto token : tokens) {
+    KuduScanner* scanner_ptr;
+    CHECK_OK(token->IntoKuduScanner(&scanner_ptr));
+    unique_ptr<KuduScanner> scanner(scanner_ptr);
+    ScanToStrings(scanner.get(), &rows);
+  }
+  std::sort(rows.begin(), rows.end());
+
+  ASSERT_EQ(expected_rows.size(), rows.size());
+  ASSERT_EQ(expected_rows, rows);
 }
 
 void FlexPartitioningITest::CheckPKRangeScan(int lower, int upper) {