You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/11/07 19:15:18 UTC

[6/6] kudu git commit: KUDU-1411: Implement HT timestamp propagation in KuduScanToken

KUDU-1411: Implement HT timestamp propagation in KuduScanToken

This patch adds timpestamp propagation in KuduScanToken for java client,
so that deserializing a ScanToken results in a propagating the timestamp
of the serializer into the deserializer.

Change-Id: If427be16d1fff60271b362134686793ffdbc19f9
Reviewed-on: http://gerrit.cloudera.org:8080/8452
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9748fd2c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9748fd2c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9748fd2c

Branch: refs/heads/master
Commit: 9748fd2c802e694d3432e7a111490ac8812c07b5
Parents: 0e3b0ce
Author: hahao <ha...@cloudera.com>
Authored: Wed Nov 1 21:17:20 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Tue Nov 7 19:14:19 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/KuduScanToken.java   | 16 ++++++++++--
 .../kudu/client/TestScannerMultiTablet.java     | 27 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9748fd2c/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index 1b21b65..2e3d1d6 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -88,6 +89,16 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
    * @throws IOException
    */
   public byte[] serialize() throws IOException {
+    return serialize(message);
+  }
+
+  /**
+   * Serializes a {@code KuduScanToken} into a byte array.
+   * @return the serialized scan token
+   * @throws IOException
+   */
+  @VisibleForTesting
+  static byte[] serialize(ScanTokenPB message) throws IOException {
     byte[] buf = new byte[message.getSerializedSize()];
     CodedOutputStream cos = CodedOutputStream.newInstance(buf);
     message.writeTo(cos);
@@ -212,8 +223,9 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
       }
     }
 
-    if (message.hasPropagatedTimestamp()) {
-      // TODO (KUDU-1411)
+    if (message.hasPropagatedTimestamp() &&
+        message.getPropagatedTimestamp() != AsyncKuduClient.NO_TIMESTAMP) {
+      client.updateLastPropagatedTimestamp(message.getPropagatedTimestamp());
     }
 
     if (message.hasCacheBlocks()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/9748fd2c/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
index d978c5b..e81d4bb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
@@ -30,6 +30,7 @@ import com.stumbleupon.async.Deferred;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.kudu.client.Client.ScanTokenPB;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 
@@ -261,6 +262,32 @@ public class TestScannerMultiTablet extends BaseKuduTest {
     assertNotEquals(0, rowCount);
   }
 
+  @Test(timeout = 100000)
+  public void testScanTokenPropagatesTimestamp() throws Exception {
+    // Initially, the client does not have the timestamp set.
+    assertEquals(AsyncKuduClient.NO_TIMESTAMP, client.getLastPropagatedTimestamp());
+    assertEquals(KuduClient.NO_TIMESTAMP, syncClient.getLastPropagatedTimestamp());
+    AsyncKuduScanner scanner = client.newScannerBuilder(table).build();
+    KuduScanner syncScanner = new KuduScanner(scanner);
+
+    // Let the client receive the propagated timestamp in the scanner response.
+    syncScanner.nextRows().getNumRows();
+    final long tsPrev = client.getLastPropagatedTimestamp();
+    final long tsPropagated = tsPrev + 1000000;
+
+    ScanTokenPB.Builder pbBuilder = ScanTokenPB.newBuilder();
+    pbBuilder.setTableName(table.getName());
+    pbBuilder.setPropagatedTimestamp(tsPropagated);
+    Client.ScanTokenPB scanTokenPB = pbBuilder.build();
+    final byte[] serializedToken = KuduScanToken.serialize(scanTokenPB);
+
+    // Deserialize scan tokens and make sure the client's last propagated
+    // timestamp is updated accordingly.
+    assertEquals(tsPrev, client.getLastPropagatedTimestamp());
+    KuduScanToken.deserializeIntoScanner(serializedToken, syncClient);
+    assertEquals(tsPropagated, client.getLastPropagatedTimestamp());
+  }
+
   private AsyncKuduScanner getScanner(String lowerBoundKeyOne,
                                       String lowerBoundKeyTwo,
                                       String exclusiveUpperBoundKeyOne,