You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/21 22:40:16 UTC

[3/3] incubator-kudu git commit: [java client] Integrate with the replay cache

[java client] Integrate with the replay cache

This patch adds the required functionality to have the Java client use the
server-side replay cache.

Change-Id: I108cd30acbc308bfb4577d072c2a8f26d1553c68
Reviewed-on: http://gerrit.cloudera.org:8080/3631
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/b57b02b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/b57b02b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/b57b02b9

Branch: refs/heads/master
Commit: b57b02b947512e1f40fd19df13cb89542eddb2c5
Parents: 0a79236
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Wed Jul 13 09:28:24 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Jul 21 22:38:17 2016 +0000

----------------------------------------------------------------------
 .../java/org/kududb/client/AsyncKuduClient.java |  9 +++
 .../org/kududb/client/AsyncKuduScanner.java     |  2 +-
 .../src/main/java/org/kududb/client/Batch.java  |  5 ++
 .../main/java/org/kududb/client/KuduRpc.java    | 28 ++++++++
 .../main/java/org/kududb/client/Operation.java  |  5 ++
 .../java/org/kududb/client/RequestTracker.java  | 76 ++++++++++++++++++++
 .../java/org/kududb/client/TabletClient.java    | 15 ++++
 .../test/java/org/kududb/client/ITClient.java   |  1 -
 .../java/org/kududb/client/MiniKuduCluster.java |  1 +
 .../org/kududb/client/TestLeaderFailover.java   |  1 -
 .../org/kududb/client/TestRequestTracker.java   | 74 +++++++++++++++++++
 11 files changed, 214 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
index 2cf9efa..c1ccb3f 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
@@ -81,6 +81,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -253,6 +254,8 @@ public class AsyncKuduClient implements AutoCloseable {
 
   private final boolean statisticsDisabled;
 
+  private final RequestTracker requestTracker;
+
   private volatile boolean closed;
 
   private AsyncKuduClient(AsyncKuduClientBuilder b) {
@@ -266,6 +269,8 @@ public class AsyncKuduClient implements AutoCloseable {
     this.statisticsDisabled = b.statisticsDisabled;
     statistics = statisticsDisabled ? null : new Statistics();
     this.timer = b.timer;
+    String clientId = UUID.randomUUID().toString().replace("-", "");
+    this.requestTracker = new RequestTracker(clientId);
   }
 
   /**
@@ -565,6 +570,10 @@ public class AsyncKuduClient implements AutoCloseable {
     return this.statistics;
   }
 
+  RequestTracker getRequestTracker() {
+    return requestTracker;
+  }
+
   /**
    * Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a particular table.
    * @param table the name of the table you intend to scan.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
index e94ad7d..7699536 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
@@ -797,7 +797,7 @@ public final class AsyncKuduScanner {
         case NEXT:
           setTablet(AsyncKuduScanner.this.tablet);
           builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
-                 .setCallSeqId(sequenceId)
+                 .setCallSeqId(AsyncKuduScanner.this.sequenceId)
                  .setBatchSizeBytes(batchSizeBytes);
           break;
         case CLOSING:

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Batch.java b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
index 3e3b960..ed1d870 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
@@ -148,6 +148,11 @@ class Batch extends KuduRpc<BatchResponse> {
   }
 
   @Override
+  boolean isRequestTracked() {
+    return true;
+  }
+
+  @Override
   void updateStatistics(Statistics statistics, BatchResponse response) {
     Slice tabletId = this.getTablet().getTabletId();
     String tableName = this.getTable().getName();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java b/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
index e7493a2..8ebd89d 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
@@ -104,6 +104,12 @@ public abstract class KuduRpc<R> {
    */
   byte attempt;  // package-private for TabletClient and AsyncKuduClient only.
 
+  /**
+   * Set by TabletClient when isRequestTracked returns true to identify this RPC in the sequence of
+   * RPCs sent by this client. Once it is set it should never change unless the RPC is reused.
+   */
+  long sequenceId = RequestTracker.NO_SEQ_NO;
+
   KuduRpc(KuduTable table) {
     this.table = table;
     this.deadlineTracker = new DeadlineTracker();
@@ -191,6 +197,10 @@ public abstract class KuduRpc<R> {
     }
     deferred = null;
     attempt = 0;
+    if (isRequestTracked()) {
+      table.getAsyncClient().getRequestTracker().rpcCompleted(sequenceId);
+      sequenceId = RequestTracker.NO_SEQ_NO;
+    }
     deadlineTracker.reset();
     d.callback(result);
   }
@@ -239,6 +249,24 @@ public abstract class KuduRpc<R> {
     deadlineTracker.setDeadline(timeout);
   }
 
+  /**
+   * If this RPC needs to be tracked on the client and server-side. Some RPCs require exactly-once
+   * semantics which is enabled by tracking them.
+   * @return true if the request has to be tracked, else false
+   */
+  boolean isRequestTracked() {
+    return false;
+  }
+
+  long getSequenceId() {
+    return sequenceId;
+  }
+
+  void setSequenceId(long sequenceId) {
+    assert (this.sequenceId == RequestTracker.NO_SEQ_NO);
+    this.sequenceId = sequenceId;
+  }
+
   public String toString() {
 
     final StringBuilder buf = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/Operation.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Operation.java b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
index 0e67c4e..e27c222 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
@@ -159,6 +159,11 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
     return this.getTable().getPartitionSchema().encodePartitionKey(row);
   }
 
+  @Override
+  boolean isRequestTracked() {
+    return true;
+  }
+
   /**
    * Get the underlying row to modify.
    * @return a partial row that will be sent with this Operation

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/RequestTracker.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/RequestTracker.java b/java/kudu-client/src/main/java/org/kududb/client/RequestTracker.java
new file mode 100644
index 0000000..229b64f
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/kududb/client/RequestTracker.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import org.kududb.annotations.InterfaceAudience;
+
+import java.util.Queue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This is the same class as src/kudu/rpc/request_tracker.h.
+ */
+@InterfaceAudience.Private
+public class RequestTracker {
+  private final AtomicLong sequenceIdTracker = new AtomicLong();
+  private final Queue<Long> incompleteRpcs = new PriorityBlockingQueue<>();
+
+  static final long NO_SEQ_NO = -1;
+
+  private final String clientId;
+
+  /**
+   * Create a new request tracker for the given client id.
+   * @param clientId identifier for the client this tracker belongs to
+   */
+  public RequestTracker(String clientId) {
+    this.clientId = clientId;
+  }
+
+  /**
+   * Generates a new sequence number and tracks it.
+   * @return a new sequence number
+   */
+  public long newSeqNo() {
+    Long next = sequenceIdTracker.incrementAndGet();
+    incompleteRpcs.add(next);
+    return next;
+  }
+
+  /**
+   * Returns the oldest sequence number that wasn't marked as completed. If there is no incomplete
+   * RPC then {@link RequestTracker#NO_SEQ_NO} is returned.
+   * @return the first incomplete sequence number
+   */
+  public long firstIncomplete() {
+    Long peek = incompleteRpcs.peek();
+    return peek == null ? NO_SEQ_NO : peek;
+  }
+
+  /**
+   * Marks the given sequence id as complete. This operation is idempotent.
+   * @param sequenceId the sequence id to mark as complete
+   */
+  public void rpcCompleted(long sequenceId) {
+    incompleteRpcs.remove(sequenceId);
+  }
+
+  public String getClientId() {
+    return clientId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
index 8f6a91f..af24246 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
@@ -143,12 +143,15 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
 
   private SecureRpcHelper secureRpcHelper;
 
+  private final RequestTracker requestTracker;
+
   public TabletClient(AsyncKuduClient client, String uuid, String host, int port) {
     this.kuduClient = client;
     this.uuid = uuid;
     this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs();
     this.host = host;
     this.port = port;
+    this.requestTracker = client.getRequestTracker();
   }
 
   <R> void sendRpc(KuduRpc<R> rpc) {
@@ -242,6 +245,18 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
         headerBuilder.setTimeoutMillis((int) Math.min(millisBeforeDeadline, localRpcTimeoutMs));
       }
 
+      if (rpc.isRequestTracked()) {
+        RpcHeader.RequestIdPB.Builder requestIdBuilder = RpcHeader.RequestIdPB.newBuilder();
+        if (rpc.getSequenceId() == RequestTracker.NO_SEQ_NO) {
+          rpc.setSequenceId(requestTracker.newSeqNo());
+        }
+        requestIdBuilder.setClientId(requestTracker.getClientId());
+        requestIdBuilder.setSeqNo(rpc.getSequenceId());
+        requestIdBuilder.setAttemptNo(rpc.attempt);
+        requestIdBuilder.setFirstIncompleteSeqNo(requestTracker.firstIncomplete());
+        headerBuilder.setRequestId(requestIdBuilder);
+      }
+
       payload = rpc.serialize(headerBuilder.build());
     } catch (Exception e) {
         LOG.error("Uncaught exception while serializing RPC: " + rpc, e);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/ITClient.java b/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
index a684596..cb8e968 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
@@ -236,7 +236,6 @@ public class ITClient extends BaseKuduTest {
 
     @Override
     public void run() {
-      session.setIgnoreAllDuplicateRows(true);
       while (KEEP_RUNNING_LATCH.getCount() > 0) {
         try {
           OperationResponse resp = session.apply(createBasicSchemaInsert(table, currentRowKey));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
index 9dbd181..955e6ab 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
@@ -128,6 +128,7 @@ public class MiniKuduCluster implements AutoCloseable {
           "--fs_wal_dir=" + dataDirPath,
           "--fs_data_dirs=" + dataDirPath,
           "--flush_threshold_mb=1",
+          "--enable_exactly_once",
           "--tserver_master_addrs=" + masterAddresses,
           "--webserver_interface=" + localhost,
           "--local_ip_for_outbound_sockets=" + localhost,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/test/java/org/kududb/client/TestLeaderFailover.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestLeaderFailover.java b/java/kudu-client/src/test/java/org/kududb/client/TestLeaderFailover.java
index 08926f7..49ac502 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestLeaderFailover.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestLeaderFailover.java
@@ -46,7 +46,6 @@ public class TestLeaderFailover extends BaseKuduTest {
   @Test(timeout = 100000)
   public void testFailover() throws Exception {
     KuduSession session = syncClient.newSession();
-    session.setIgnoreAllDuplicateRows(true);
     for (int i = 0; i < 3; i++) {
       session.apply(createBasicSchemaInsert(table, i));
     }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/test/java/org/kududb/client/TestRequestTracker.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestRequestTracker.java b/java/kudu-client/src/test/java/org/kududb/client/TestRequestTracker.java
new file mode 100644
index 0000000..7528de6
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestRequestTracker.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class TestRequestTracker {
+
+  @Test(timeout = 10000)
+  public void test() {
+    RequestTracker tracker = new RequestTracker("test");
+
+    // A new tracker should have no incomplete RPCs.
+    assertEquals(RequestTracker.NO_SEQ_NO, tracker.firstIncomplete());
+
+    int max = 10;
+
+    for (int i = 0; i < max; i++) {
+      tracker.newSeqNo();
+    }
+
+    // The first RPC is the incomplete one.
+    assertEquals(1, tracker.firstIncomplete());
+
+    // Mark the first as complete, incomplete should advance by 1.
+    tracker.rpcCompleted(1);
+    assertEquals(2, tracker.firstIncomplete());
+
+    // Mark the RPC in the middle as complete, first incomplete doesn't change.
+    tracker.rpcCompleted(5);
+    assertEquals(2, tracker.firstIncomplete());
+
+    // Mark the first half as complete.
+    // Note that we're also testing that rpcCompleted is idempotent.
+    for (int i = 1; i < max / 2; i++) {
+      tracker.rpcCompleted(i);
+    }
+
+    assertEquals(6, tracker.firstIncomplete());
+
+    // Get a few more sequence numbers.
+    long lastSeqNo = 0;
+    for (int i = max / 2; i <= max; i++) {
+      lastSeqNo = tracker.newSeqNo();
+    }
+
+    // Mark them all as complete except the last one.
+    while (tracker.firstIncomplete() != lastSeqNo) {
+      tracker.rpcCompleted(tracker.firstIncomplete());
+    }
+
+    assertEquals(lastSeqNo, tracker.firstIncomplete());
+    tracker.rpcCompleted(lastSeqNo);
+
+    // Test that we get back to NO_SEQ_NO after marking them all.
+    assertEquals(RequestTracker.NO_SEQ_NO, tracker.firstIncomplete());
+  }
+}