You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/12/01 01:15:09 UTC
[6/9] kudu git commit: KUDU-420 [c++ client] timestamp propagation
via scan tokens
KUDU-420 [c++ client] timestamp propagation via scan tokens
Implemented server timestamp propagation via scan tokens for the
Kudu C++ client library. Added corresponding unit test as well.
Besides, this changelist also enables the
ConsistencyITest.TestScanTokenTimestampPropagation test: this patch
brings the necessary fix to make it pass.
This is in the context of the following JIRA item:
KUDU-420 c++ client: implement HT timestamp propagation via scan tokens
Change-Id: I5c76c20b62cb91695c69f7dc4b98f4dc98bf02cc
Reviewed-on: http://gerrit.cloudera.org:8080/5220
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d81aa80a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d81aa80a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d81aa80a
Branch: refs/heads/master
Commit: d81aa80a98bcec4b928e4a50c3c0a95f09bb57af
Parents: 68dc1ff
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Nov 24 14:33:01 2016 -0800
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Nov 30 21:33:34 2016 +0000
----------------------------------------------------------------------
src/kudu/client/client.h | 1 +
src/kudu/client/scan_token-internal.cc | 10 +--
src/kudu/client/scan_token-internal.h | 1 -
src/kudu/client/scan_token-test.cc | 80 ++++++++++++++++++++
src/kudu/integration-tests/consistency-itest.cc | 2 +-
5 files changed, 86 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 9b72f97..506a19f 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -464,6 +464,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
friend class ClientTest;
friend class KuduClientBuilder;
friend class KuduScanner;
+ friend class KuduScanToken;
friend class KuduScanTokenBuilder;
friend class KuduSession;
friend class KuduTable;
http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/scan_token-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index c0d327d..678b915 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -51,9 +51,6 @@ KuduScanToken::Data::Data(KuduTable* table,
tablet_(std::move(tablet)) {
}
-KuduScanToken::Data::~Data() {
-}
-
Status KuduScanToken::Data::IntoKuduScanner(KuduScanner** scanner) const {
return PBIntoScanner(table_->client(), message_, scanner);
}
@@ -154,12 +151,12 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
RETURN_NOT_OK(scan_builder->SetSnapshotRaw(message.snap_timestamp()));
}
+ RETURN_NOT_OK(scan_builder->SetCacheBlocks(message.cache_blocks()));
+
if (message.has_propagated_timestamp()) {
- // TODO(KUDU-420)
+ client->data_->UpdateLatestObservedTimestamp(message.propagated_timestamp());
}
- RETURN_NOT_OK(scan_builder->SetCacheBlocks(message.cache_blocks()));
-
*scanner = scan_builder.release();
return Status::OK();
}
@@ -219,6 +216,7 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
pb.set_cache_blocks(configuration_.spec().cache_blocks());
pb.set_fault_tolerant(configuration_.is_fault_tolerant());
+ pb.set_propagated_timestamp(client->GetLatestObservedTimestamp());
MonoTime deadline = MonoTime::Now() + client->default_admin_operation_timeout();
http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/scan_token-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-internal.h b/src/kudu/client/scan_token-internal.h
index 65bef42..dd26517 100644
--- a/src/kudu/client/scan_token-internal.h
+++ b/src/kudu/client/scan_token-internal.h
@@ -33,7 +33,6 @@ class KuduScanToken::Data {
explicit Data(KuduTable* table,
ScanTokenPB message,
std::unique_ptr<KuduTablet> tablet);
- ~Data();
Status IntoKuduScanner(KuduScanner** scanner) const;
http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/client/scan_token-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc
index 2a26b69..8875315 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -23,6 +23,7 @@
#include <vector>
#include "kudu/client/client.h"
+#include "kudu/client/client.pb.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/integration-tests/mini_cluster.h"
#include "kudu/tserver/mini_tablet_server.h"
@@ -322,5 +323,84 @@ TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) {
}
}
+// When building a scanner from a serialized scan token,
+// verify that the propagated timestamp from the token makes its way into the
+// latest observed timestamp of the client object.
+TEST_F(ScanTokenTest, TestTimestampPropagation) {
+ static const string kTableName = "p_ts_table";
+
+ // Create a table to work with:
+ // * Deserializing a scan token into a scanner requires the table to exist.
+ // * Creating a scan token requires the table to exist.
+ shared_ptr<KuduTable> table;
+ {
+ static const string kKeyColumnName = "c_key";
+ KuduSchema schema;
+ {
+ KuduSchemaBuilder builder;
+ builder.AddColumn(kKeyColumnName)->NotNull()->
+ Type(KuduColumnSchema::INT64)->PrimaryKey();
+ ASSERT_OK(builder.Build(&schema));
+ }
+
+ {
+ unique_ptr<KuduPartialRow> split(schema.NewRow());
+ ASSERT_OK(split->SetInt64(kKeyColumnName, 0));
+ unique_ptr<client::KuduTableCreator> creator(client_->NewTableCreator());
+ ASSERT_OK(creator->table_name(kTableName)
+ .schema(&schema)
+ .add_hash_partitions({ kKeyColumnName }, 2)
+ .split_rows({ split.release() })
+ .num_replicas(1)
+ .Create());
+ }
+ }
+
+ // Deserialize a scan token and make sure the client's last observed timestamp
+ // is updated accordingly.
+ {
+ const uint64_t ts_prev = client_->GetLatestObservedTimestamp();
+ const uint64_t ts_propagated = ts_prev + 1000000;
+
+ ScanTokenPB pb;
+ pb.set_table_name(kTableName);
+ pb.set_read_mode(::kudu::READ_AT_SNAPSHOT);
+ pb.set_propagated_timestamp(ts_propagated);
+ const string serialized_token = pb.SerializeAsString();
+ EXPECT_EQ(ts_prev, client_->GetLatestObservedTimestamp());
+
+ KuduScanner* scanner_raw;
+ ASSERT_OK(KuduScanToken::DeserializeIntoScanner(client_.get(),
+ serialized_token,
+ &scanner_raw));
+ // The caller of the DeserializeIntoScanner() is responsible for
+ // de-allocating the result scanner object.
+ unique_ptr<KuduScanner> scanner(scanner_raw);
+ EXPECT_EQ(ts_propagated, client_->GetLatestObservedTimestamp());
+ }
+
+ // Build the set of scan tokens for the table, serialize them and
+ // make sure the serialized tokens contain the propagated timestamp.
+ {
+ ASSERT_OK(client_->OpenTable(kTableName, &table));
+ const uint64_t ts_prev = client_->GetLatestObservedTimestamp();
+ const uint64_t ts_propagated = ts_prev + 1000000;
+
+ client_->SetLatestObservedTimestamp(ts_propagated);
+ vector<KuduScanToken*> tokens;
+ ElementDeleter deleter(&tokens);
+ ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
+ for (const auto* t : tokens) {
+ string serialized_token;
+ ASSERT_OK(t->Serialize(&serialized_token));
+
+ ScanTokenPB pb;
+ ASSERT_TRUE(pb.ParseFromString(serialized_token));
+ ASSERT_TRUE(pb.has_propagated_timestamp());
+ EXPECT_EQ(ts_propagated, pb.propagated_timestamp());
+ }
+ }
+}
+
} // namespace client
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/d81aa80a/src/kudu/integration-tests/consistency-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/consistency-itest.cc b/src/kudu/integration-tests/consistency-itest.cc
index 9d2a870..a69f128 100644
--- a/src/kudu/integration-tests/consistency-itest.cc
+++ b/src/kudu/integration-tests/consistency-itest.cc
@@ -538,7 +538,7 @@ TEST_F(ConsistencyITest, TestSnapshotScanTimestampReuse) {
// with the specified timestamp to retrieve the data: it should observe
// a timestamp which is not less than the propagated timestamp
// encoded in the token.
-TEST_F(ConsistencyITest, DISABLED_TestScanTokenTimestampPropagation) {
+TEST_F(ConsistencyITest, TestScanTokenTimestampPropagation) {
const int32_t offset_usec = FLAGS_max_clock_sync_error_usec;
// Need to have at least one row in the first partition with