You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/12/01 01:15:08 UTC

[5/9] kudu git commit: KUDU-420 [i-tests] scan token timestamp propagation test

KUDU-420 [i-tests] scan token timestamp propagation test

Added an integration test to verify timestamp propagation via scan
tokens for C++ client.

The test is disabled as it currently fails. A follow up patch will fix
the bug and enable the test.

This is in the context of the following JIRA item:
KUDU-420 c++ client: implement HT timestamp propagation via scan tokens

Change-Id: I47cd067248f4a26c4605f075ec5ee30da71f6f30
Reviewed-on: http://gerrit.cloudera.org:8080/5219
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/68dc1ffe
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/68dc1ffe
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/68dc1ffe

Branch: refs/heads/master
Commit: 68dc1ffeff2f6ad7ad8654da0df66162cb286cca
Parents: b4764f9
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Nov 24 14:24:57 2016 -0800
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Nov 30 21:33:27 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/consistency-itest.cc | 111 ++++++++++++++++++-
 1 file changed, 107 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/68dc1ffe/src/kudu/integration-tests/consistency-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc
index 45efeb9..9d2a870 100644
--- a/src/kudu/integration-tests/consistency-itest.cc
+++ b/src/kudu/integration-tests/consistency-itest.cc
@@ -295,7 +295,8 @@ TEST_F(ConsistencyITest, TestTimestampPropagationFromScans) {
         0, MonoDelta::FromMilliseconds(100)));
 
     // Insert data into the first tablet (a.k.a. Ta).
-    ASSERT_OK(InsertTestRows(client.get(), table.get(), key_split_value_, 0));
+    const int rows_num = key_split_value_;  // fill in the partition completely
+    ASSERT_OK(InsertTestRows(client.get(), table.get(), rows_num, 0));
     size_t row_count;
     ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_LATEST, 0,
                           &row_count));
@@ -324,8 +325,9 @@ TEST_F(ConsistencyITest, TestTimestampPropagationFromScans) {
 
     // Inserting data into the second tablet (a.k.a. Tb): using the second
     // key range partition.
+    const int rows_num = key_split_value_;
     ASSERT_OK(InsertTestRows(client.get(), table.get(),
-                             key_split_value_, key_split_value_));
+                             rows_num, key_split_value_));
     // Retrieve the latest observed timestamp.
     ts_b = client->GetLatestObservedTimestamp();
   }
@@ -358,8 +360,8 @@ TEST_F(ConsistencyITest, TestTimestampPropagationFromScans) {
 // when scanning following tablets.
 //
 // The idea of the test is simple: have a scan spanned across two tablets
-// where the clocks of the corresponding tablet servers are skewed. The scenario
-// is as following:
+// where the clocks of the corresponding tablet servers are skewed. The sequence
+// of actions is as following:
 //
 //   1. Create a table which spans across two tablets.
 //
@@ -516,5 +518,106 @@ TEST_F(ConsistencyITest, TestSnapshotScanTimestampReuse) {
   }
 }
 
+// Verify that the propagated timestamp from a serialized scan token
+// makes its way into corresponding tablet servers while performing a scan
+// operation built from the token.
+//
+// The real-world use-cases behind this test assume a Kudu client (C++/Java)
+// can get scan tokens for a scan operation, serialize those and pass them
+// to the other Kudu client (C++/Java). Since de-serializing a scan token
+// propagates the latest observed timestamp, the latter client will have
+// the latest observed timestamp set accordingly if it de-serializes those
+// scan tokens into corresponding scan operations.
+//
+// The test scenario uses a table split into two tablets, each hosted by a
+// tablet server. The clock of the first tablet server is shifted into the
+// future. The first client inserts a row into the first tablet. Then it creates
+// a scan token to retrieve some "related" data from the second
+// tablet hosted by the second server. Now, another client receives the
+// serialized scan token and runs corresponding READ_AT_SNAPSHOT scan
+// with the specified timestamp to retrieve the data: it should observe
+// a timestamp which is not less than the propagated timestamp
+// encoded in the token.
+TEST_F(ConsistencyITest, DISABLED_TestScanTokenTimestampPropagation) {
+  const int32_t offset_usec = FLAGS_max_clock_sync_error_usec;
+
+  // Need to have at least one row in the first partition with
+  // values starting at 0.
+  ASSERT_GE(key_split_value_, 1);
+
+  {
+    // Prepare the setup: create a proper disposition for tablet servers' clocks
+    // and populate the table with initial data.
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+    ASSERT_OK(CreateTable(client.get(), table_name_));
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client->OpenTable(table_name_, &table));
+
+    // Insert a single row into the second tablet: it's necessary to get
+    // non-empty scan in the verification phase of the test.
+    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, key_split_value_));
+  }
+
+  uint64_t ts_ref;
+  string scan_token;
+  {
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+
+    // Advance the clock of the server hosting the first partition tablet.
+    const int32_t row_key = 0;
+    ASSERT_OK(UpdateClockForTabletHostingKey(
+        row_key, MonoDelta::FromMicroseconds(offset_usec)));
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client->OpenTable(table_name_, &table));
+    // Insert just a single row into the first tablet.
+    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, row_key));
+    ts_ref = client->GetLatestObservedTimestamp();
+
+    // Create and serialize a scan token: the scan selects a row by its key
+    // from the other tablet at the timestamp at which the first row was
+    // inserted.
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    KuduScanTokenBuilder builder(table.get());
+    ASSERT_OK(builder.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
+    unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate(
+        key_column_name_,
+        KuduPredicate::EQUAL,
+        KuduValue::FromInt(key_split_value_)));
+    ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
+    ASSERT_OK(builder.Build(&tokens));
+    ASSERT_EQ(1, tokens.size());
+    ASSERT_OK(tokens[0]->Serialize(&scan_token));
+  }
+
+  // The other client: scan the second tablet using a scanner built from
+  // the serialized scanner token. If the client propagates timestamp from the
+  // de-serialized scan token, upon fetching a batch of rows the client
+  // should observe timestamp not less than the reference propagated timestamp
+  // encoded in the token.
+  {
+    shared_ptr<KuduClient> client;
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client->OpenTable(table_name_, &table));
+    KuduScanner* scanner_raw;
+    ASSERT_OK(KuduScanToken::DeserializeIntoScanner(client.get(), scan_token,
+                                                    &scanner_raw));
+    unique_ptr<KuduScanner> scanner(scanner_raw);
+    ASSERT_OK(scanner->Open());
+    ASSERT_TRUE(scanner->HasMoreRows());
+    size_t row_count = 0;
+    while (scanner->HasMoreRows()) {
+      KuduScanBatch batch;
+      ASSERT_OK(scanner->NextBatch(&batch));
+      row_count += batch.NumRows();
+      ASSERT_LE(ts_ref, client->GetLatestObservedTimestamp());
+    }
+    EXPECT_EQ(1, row_count);
+  }
+}
+
 } // namespace client
 } // namespace kudu