You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/11/07 19:15:18 UTC
[6/6] kudu git commit: KUDU-1411: Implement HT timestamp propagation
in KuduScanToken
KUDU-1411: Implement HT timestamp propagation in KuduScanToken
This patch adds timpestamp propagation in KuduScanToken for java client,
so that deserializing a ScanToken results in a propagating the timestamp
of the serializer into the deserializer.
Change-Id: If427be16d1fff60271b362134686793ffdbc19f9
Reviewed-on: http://gerrit.cloudera.org:8080/8452
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9748fd2c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9748fd2c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9748fd2c
Branch: refs/heads/master
Commit: 9748fd2c802e694d3432e7a111490ac8812c07b5
Parents: 0e3b0ce
Author: hahao <ha...@cloudera.com>
Authored: Wed Nov 1 21:17:20 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Tue Nov 7 19:14:19 2017 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/KuduScanToken.java | 16 ++++++++++--
.../kudu/client/TestScannerMultiTablet.java | 27 ++++++++++++++++++++
2 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/9748fd2c/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index 1b21b65..2e3d1d6 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -88,6 +89,16 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
* @throws IOException
*/
public byte[] serialize() throws IOException {
+ return serialize(message);
+ }
+
+ /**
+ * Serializes a {@code KuduScanToken} into a byte array.
+ * @return the serialized scan token
+ * @throws IOException
+ */
+ @VisibleForTesting
+ static byte[] serialize(ScanTokenPB message) throws IOException {
byte[] buf = new byte[message.getSerializedSize()];
CodedOutputStream cos = CodedOutputStream.newInstance(buf);
message.writeTo(cos);
@@ -212,8 +223,9 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
}
}
- if (message.hasPropagatedTimestamp()) {
- // TODO (KUDU-1411)
+ if (message.hasPropagatedTimestamp() &&
+ message.getPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP) {
+ client.updateLastPropagatedTimestamp(message.getPropagatedTimestamp());
}
if (message.hasCacheBlocks()) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/9748fd2c/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
index d978c5b..e81d4bb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
@@ -30,6 +30,7 @@ import com.stumbleupon.async.Deferred;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.kudu.client.Client.ScanTokenPB;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
@@ -261,6 +262,32 @@ public class TestScannerMultiTablet extends BaseKuduTest {
assertNotEquals(0, rowCount);
}
+ @Test(timeout = 100000)
+ public void testScanTokenPropagatesTimestamp() throws Exception {
+ // Initially, the client does not have the timestamp set.
+ assertEquals(AsyncKuduClient.NO_TIMESTAMP, client.getLastPropagatedTimestamp());
+ assertEquals(KuduClient.NO_TIMESTAMP, syncClient.getLastPropagatedTimestamp());
+ AsyncKuduScanner scanner = client.newScannerBuilder(table).build();
+ KuduScanner syncScanner = new KuduScanner(scanner);
+
+ // Let the client receive the propagated timestamp in the scanner response.
+ syncScanner.nextRows().getNumRows();
+ final long tsPrev = client.getLastPropagatedTimestamp();
+ final long tsPropagated = tsPrev + 1000000;
+
+ ScanTokenPB.Builder pbBuilder = ScanTokenPB.newBuilder();
+ pbBuilder.setTableName(table.getName());
+ pbBuilder.setPropagatedTimestamp(tsPropagated);
+ Client.ScanTokenPB scanTokenPB = pbBuilder.build();
+ final byte[] serializedToken = KuduScanToken.serialize(scanTokenPB);
+
+ // Deserialize scan tokens and make sure the client's last propagated
+ // timestamp is updated accordingly.
+ assertEquals(tsPrev, client.getLastPropagatedTimestamp());
+ KuduScanToken.deserializeIntoScanner(serializedToken, syncClient);
+ assertEquals(tsPropagated, client.getLastPropagatedTimestamp());
+ }
+
private AsyncKuduScanner getScanner(String lowerBoundKeyOne,
String lowerBoundKeyTwo,
String exclusiveUpperBoundKeyOne,