You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/12/01 01:15:09 UTC

[6/9] kudu git commit: KUDU-420 [c++ client] timestamp propagation via scan tokens

KUDU-420 [c++ client] timestamp propagation via scan tokens

Implemented server timestamp propagation via scan tokens for the
Kudu C++ client library. Added corresponding unit test as well.

Besides, this changelist also enables the
ConsistencyITest.TestScanTokenTimestampPropagation test: this patch
brings the necessary fix to make it pass.

This is in the context of the following JIRA item:
KUDU-420 c++ client: implement HT timestamp propagation via scan tokens

Change-Id: I5c76c20b62cb91695c69f7dc4b98f4dc98bf02cc
Reviewed-on: http://gerrit.cloudera.org:8080/5220
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d81aa80a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d81aa80a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d81aa80a

Branch: refs/heads/master
Commit: d81aa80a98bcec4b928e4a50c3c0a95f09bb57af
Parents: 68dc1ff
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Nov 24 14:33:01 2016 -0800
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Nov 30 21:33:34 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client.h                        |  1 +
 src/kudu/client/scan_token-internal.cc          | 10 +--
 src/kudu/client/scan_token-internal.h           |  1 -
 src/kudu/client/scan_token-test.cc              | 80 ++++++++++++++++++++
 src/kudu/integration-tests/consistency-itest.cc |  2 +-
 5 files changed, 86 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 9b72f97..506a19f 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -464,6 +464,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   friend class ClientTest;
   friend class KuduClientBuilder;
   friend class KuduScanner;
+  friend class KuduScanToken;
   friend class KuduScanTokenBuilder;
   friend class KuduSession;
   friend class KuduTable;

http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/scan_token-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index c0d327d..678b915 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -51,9 +51,6 @@ KuduScanToken::Data::Data(KuduTable* table,
       tablet_(std::move(tablet)) {
 }
 
-KuduScanToken::Data::~Data() {
-}
-
 Status KuduScanToken::Data::IntoKuduScanner(KuduScanner** scanner) const {
   return PBIntoScanner(table_->client(), message_, scanner);
 }
@@ -154,12 +151,12 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
     RETURN_NOT_OK(scan_builder->SetSnapshotRaw(message.snap_timestamp()));
   }
 
+  RETURN_NOT_OK(scan_builder->SetCacheBlocks(message.cache_blocks()));
+
   if (message.has_propagated_timestamp()) {
-    // TODO(KUDU-420)
+    client->data_->UpdateLatestObservedTimestamp(message.propagated_timestamp());
   }
 
-  RETURN_NOT_OK(scan_builder->SetCacheBlocks(message.cache_blocks()));
-
   *scanner = scan_builder.release();
   return Status::OK();
 }
@@ -219,6 +216,7 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
 
   pb.set_cache_blocks(configuration_.spec().cache_blocks());
   pb.set_fault_tolerant(configuration_.is_fault_tolerant());
+  pb.set_propagated_timestamp(client->GetLatestObservedTimestamp());
 
   MonoTime deadline = MonoTime::Now() + client->default_admin_operation_timeout();
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/scan_token-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-internal.h b/src/kudu/client/scan_token-internal.h
index 65bef42..dd26517 100644
--- a/src/kudu/client/scan_token-internal.h
+++ b/src/kudu/client/scan_token-internal.h
@@ -33,7 +33,6 @@ class KuduScanToken::Data {
   explicit Data(KuduTable* table,
                 ScanTokenPB message,
                 std::unique_ptr<KuduTablet> tablet);
-  ~Data();
 
   Status IntoKuduScanner(KuduScanner** scanner) const;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/scan_token-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc
index 2a26b69..8875315 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "kudu/client/client.h"
+#include "kudu/client/client.pb.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/integration-tests/mini_cluster.h"
 #include "kudu/tserver/mini_tablet_server.h"
@@ -322,5 +323,84 @@ TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) {
   }
 }
 
+// When building a scanner from a serialized scan token,
+// verify that the propagated timestamp from the token makes its way into the
+// latest observed timestamp of the client object.
+TEST_F(ScanTokenTest, TestTimestampPropagation) {
+  static const string kTableName = "p_ts_table";
+
+  // Create a table to work with:
+  //   * Deserializing a scan token into a scanner requires the table to exist.
+  //   * Creating a scan token requires the table to exist.
+  shared_ptr<KuduTable> table;
+  {
+    static const string kKeyColumnName = "c_key";
+    KuduSchema schema;
+    {
+      KuduSchemaBuilder builder;
+      builder.AddColumn(kKeyColumnName)->NotNull()->
+          Type(KuduColumnSchema::INT64)->PrimaryKey();
+      ASSERT_OK(builder.Build(&schema));
+    }
+
+    {
+      unique_ptr<KuduPartialRow> split(schema.NewRow());
+      ASSERT_OK(split->SetInt64(kKeyColumnName, 0));
+      unique_ptr<client::KuduTableCreator> creator(client_->NewTableCreator());
+      ASSERT_OK(creator->table_name(kTableName)
+                .schema(&schema)
+                .add_hash_partitions({ kKeyColumnName }, 2)
+                .split_rows({ split.release() })
+                .num_replicas(1)
+                .Create());
+    }
+  }
+
+  // Deserialize a scan token and make sure the client's last observed timestamp
+  // is updated accordingly.
+  {
+    const uint64_t ts_prev = client_->GetLatestObservedTimestamp();
+    const uint64_t ts_propagated = ts_prev + 1000000;
+
+    ScanTokenPB pb;
+    pb.set_table_name(kTableName);
+    pb.set_read_mode(::kudu::READ_AT_SNAPSHOT);
+    pb.set_propagated_timestamp(ts_propagated);
+    const string serialized_token = pb.SerializeAsString();
+    EXPECT_EQ(ts_prev, client_->GetLatestObservedTimestamp());
+
+    KuduScanner* scanner_raw;
+    ASSERT_OK(KuduScanToken::DeserializeIntoScanner(client_.get(),
+                                                    serialized_token,
+                                                    &scanner_raw));
+    // The caller of the DeserializeIntoScanner() is responsible for
+    // de-allocating the result scanner object.
+    unique_ptr<KuduScanner> scanner(scanner_raw);
+    EXPECT_EQ(ts_propagated, client_->GetLatestObservedTimestamp());
+  }
+
+  // Build the set of scan tokens for the table, serialize them and
+  // make sure the serialized tokens contain the propagated timestamp.
+  {
+    ASSERT_OK(client_->OpenTable(kTableName, &table));
+    const uint64_t ts_prev = client_->GetLatestObservedTimestamp();
+    const uint64_t ts_propagated = ts_prev + 1000000;
+
+    client_->SetLatestObservedTimestamp(ts_propagated);
+    vector<KuduScanToken*> tokens;
+    ElementDeleter deleter(&tokens);
+    ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
+    for (const auto* t : tokens) {
+      string serialized_token;
+      ASSERT_OK(t->Serialize(&serialized_token));
+
+      ScanTokenPB pb;
+      ASSERT_TRUE(pb.ParseFromString(serialized_token));
+      ASSERT_TRUE(pb.has_propagated_timestamp());
+      EXPECT_EQ(ts_propagated, pb.propagated_timestamp());
+    }
+  }
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/integration-tests/consistency-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc
index 9d2a870..a69f128 100644
--- a/src/kudu/integration-tests/consistency-itest.cc
+++ b/src/kudu/integration-tests/consistency-itest.cc
@@ -538,7 +538,7 @@ TEST_F(ConsistencyITest, TestSnapshotScanTimestampReuse) {
 // with the specified timestamp to retrieve the data: it should observe
 // a timestamp which is not less than the propagated timestamp
 // encoded in the token.
-TEST_F(ConsistencyITest, DISABLED_TestScanTokenTimestampPropagation) {
+TEST_F(ConsistencyITest, TestScanTokenTimestampPropagation) {
   const int32_t offset_usec = FLAGS_max_clock_sync_error_usec;
 
   // Need to have at least one row in the first partition with