You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/21 22:40:16 UTC
[3/3] incubator-kudu git commit: [java client] Integrate with the
replay cache
[java client] Integrate with the replay cache
This patch adds the required functionality to have the Java client use the
server-side replay cache.
Change-Id: I108cd30acbc308bfb4577d072c2a8f26d1553c68
Reviewed-on: http://gerrit.cloudera.org:8080/3631
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/b57b02b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/b57b02b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/b57b02b9
Branch: refs/heads/master
Commit: b57b02b947512e1f40fd19df13cb89542eddb2c5
Parents: 0a79236
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Wed Jul 13 09:28:24 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Jul 21 22:38:17 2016 +0000
----------------------------------------------------------------------
.../java/org/kududb/client/AsyncKuduClient.java | 9 +++
.../org/kududb/client/AsyncKuduScanner.java | 2 +-
.../src/main/java/org/kududb/client/Batch.java | 5 ++
.../main/java/org/kududb/client/KuduRpc.java | 28 ++++++++
.../main/java/org/kududb/client/Operation.java | 5 ++
.../java/org/kududb/client/RequestTracker.java | 76 ++++++++++++++++++++
.../java/org/kududb/client/TabletClient.java | 15 ++++
.../test/java/org/kududb/client/ITClient.java | 1 -
.../java/org/kududb/client/MiniKuduCluster.java | 1 +
.../org/kududb/client/TestLeaderFailover.java | 1 -
.../org/kududb/client/TestRequestTracker.java | 74 +++++++++++++++++++
11 files changed, 214 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
index 2cf9efa..c1ccb3f 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
@@ -81,6 +81,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -253,6 +254,8 @@ public class AsyncKuduClient implements AutoCloseable {
private final boolean statisticsDisabled;
+ private final RequestTracker requestTracker;
+
private volatile boolean closed;
private AsyncKuduClient(AsyncKuduClientBuilder b) {
@@ -266,6 +269,8 @@ public class AsyncKuduClient implements AutoCloseable {
this.statisticsDisabled = b.statisticsDisabled;
statistics = statisticsDisabled ? null : new Statistics();
this.timer = b.timer;
+ String clientId = UUID.randomUUID().toString().replace("-", "");
+ this.requestTracker = new RequestTracker(clientId);
}
/**
@@ -565,6 +570,10 @@ public class AsyncKuduClient implements AutoCloseable {
return this.statistics;
}
+ RequestTracker getRequestTracker() {
+ return requestTracker;
+ }
+
/**
* Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a particular table.
* @param table the name of the table you intend to scan.
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
index e94ad7d..7699536 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
@@ -797,7 +797,7 @@ public final class AsyncKuduScanner {
case NEXT:
setTablet(AsyncKuduScanner.this.tablet);
builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
- .setCallSeqId(sequenceId)
+ .setCallSeqId(AsyncKuduScanner.this.sequenceId)
.setBatchSizeBytes(batchSizeBytes);
break;
case CLOSING:
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/Batch.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Batch.java b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
index 3e3b960..ed1d870 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Batch.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Batch.java
@@ -148,6 +148,11 @@ class Batch extends KuduRpc<BatchResponse> {
}
@Override
+ boolean isRequestTracked() {
+ return true;
+ }
+
+ @Override
void updateStatistics(Statistics statistics, BatchResponse response) {
Slice tabletId = this.getTablet().getTabletId();
String tableName = this.getTable().getName();
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java b/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
index e7493a2..8ebd89d 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/KuduRpc.java
@@ -104,6 +104,12 @@ public abstract class KuduRpc<R> {
*/
byte attempt; // package-private for TabletClient and AsyncKuduClient only.
+ /**
+ * Set by TabletClient when isRequestTracked returns true to identify this RPC in the sequence of
+ * RPCs sent by this client. Once it is set it should never change unless the RPC is reused.
+ */
+ long sequenceId = RequestTracker.NO_SEQ_NO;
+
KuduRpc(KuduTable table) {
this.table = table;
this.deadlineTracker = new DeadlineTracker();
@@ -191,6 +197,10 @@ public abstract class KuduRpc<R> {
}
deferred = null;
attempt = 0;
+ if (isRequestTracked()) {
+ table.getAsyncClient().getRequestTracker().rpcCompleted(sequenceId);
+ sequenceId = RequestTracker.NO_SEQ_NO;
+ }
deadlineTracker.reset();
d.callback(result);
}
@@ -239,6 +249,24 @@ public abstract class KuduRpc<R> {
deadlineTracker.setDeadline(timeout);
}
+ /**
+ * If this RPC needs to be tracked on the client and server-side. Some RPCs require exactly-once
+ * semantics which is enabled by tracking them.
+ * @return true if the request has to be tracked, else false
+ */
+ boolean isRequestTracked() {
+ return false;
+ }
+
+ long getSequenceId() {
+ return sequenceId;
+ }
+
+ void setSequenceId(long sequenceId) {
+ assert (this.sequenceId == RequestTracker.NO_SEQ_NO);
+ this.sequenceId = sequenceId;
+ }
+
public String toString() {
final StringBuilder buf = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/Operation.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/Operation.java b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
index 0e67c4e..e27c222 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/Operation.java
@@ -159,6 +159,11 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
return this.getTable().getPartitionSchema().encodePartitionKey(row);
}
+ @Override
+ boolean isRequestTracked() {
+ return true;
+ }
+
/**
* Get the underlying row to modify.
* @return a partial row that will be sent with this Operation
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/RequestTracker.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/RequestTracker.java b/java/kudu-client/src/main/java/org/kududb/client/RequestTracker.java
new file mode 100644
index 0000000..229b64f
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/kududb/client/RequestTracker.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import org.kududb.annotations.InterfaceAudience;
+
+import java.util.Queue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This is the same class as src/kudu/rpc/request_tracker.h.
+ */
+@InterfaceAudience.Private
+public class RequestTracker {
+ private final AtomicLong sequenceIdTracker = new AtomicLong();
+ private final Queue<Long> incompleteRpcs = new PriorityBlockingQueue<>();
+
+ static final long NO_SEQ_NO = -1;
+
+ private final String clientId;
+
+ /**
+ * Create a new request tracker for the given client id.
+ * @param clientId identifier for the client this tracker belongs to
+ */
+ public RequestTracker(String clientId) {
+ this.clientId = clientId;
+ }
+
+ /**
+ * Generates a new sequence number and tracks it.
+ * @return a new sequence number
+ */
+ public long newSeqNo() {
+ Long next = sequenceIdTracker.incrementAndGet();
+ incompleteRpcs.add(next);
+ return next;
+ }
+
+ /**
+ * Returns the oldest sequence number that wasn't marked as completed. If there is no incomplete
+ * RPC then {@link RequestTracker#NO_SEQ_NO} is returned.
+ * @return the first incomplete sequence number
+ */
+ public long firstIncomplete() {
+ Long peek = incompleteRpcs.peek();
+ return peek == null ? NO_SEQ_NO : peek;
+ }
+
+ /**
+ * Marks the given sequence id as complete. This operation is idempotent.
+ * @param sequenceId the sequence id to mark as complete
+ */
+ public void rpcCompleted(long sequenceId) {
+ incompleteRpcs.remove(sequenceId);
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
index 8f6a91f..af24246 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
@@ -143,12 +143,15 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
private SecureRpcHelper secureRpcHelper;
+ private final RequestTracker requestTracker;
+
public TabletClient(AsyncKuduClient client, String uuid, String host, int port) {
this.kuduClient = client;
this.uuid = uuid;
this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs();
this.host = host;
this.port = port;
+ this.requestTracker = client.getRequestTracker();
}
<R> void sendRpc(KuduRpc<R> rpc) {
@@ -242,6 +245,18 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
headerBuilder.setTimeoutMillis((int) Math.min(millisBeforeDeadline, localRpcTimeoutMs));
}
+ if (rpc.isRequestTracked()) {
+ RpcHeader.RequestIdPB.Builder requestIdBuilder = RpcHeader.RequestIdPB.newBuilder();
+ if (rpc.getSequenceId() == RequestTracker.NO_SEQ_NO) {
+ rpc.setSequenceId(requestTracker.newSeqNo());
+ }
+ requestIdBuilder.setClientId(requestTracker.getClientId());
+ requestIdBuilder.setSeqNo(rpc.getSequenceId());
+ requestIdBuilder.setAttemptNo(rpc.attempt);
+ requestIdBuilder.setFirstIncompleteSeqNo(requestTracker.firstIncomplete());
+ headerBuilder.setRequestId(requestIdBuilder);
+ }
+
payload = rpc.serialize(headerBuilder.build());
} catch (Exception e) {
LOG.error("Uncaught exception while serializing RPC: " + rpc, e);
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/ITClient.java b/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
index a684596..cb8e968 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
@@ -236,7 +236,6 @@ public class ITClient extends BaseKuduTest {
@Override
public void run() {
- session.setIgnoreAllDuplicateRows(true);
while (KEEP_RUNNING_LATCH.getCount() > 0) {
try {
OperationResponse resp = session.apply(createBasicSchemaInsert(table, currentRowKey));
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
index 9dbd181..955e6ab 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/MiniKuduCluster.java
@@ -128,6 +128,7 @@ public class MiniKuduCluster implements AutoCloseable {
"--fs_wal_dir=" + dataDirPath,
"--fs_data_dirs=" + dataDirPath,
"--flush_threshold_mb=1",
+ "--enable_exactly_once",
"--tserver_master_addrs=" + masterAddresses,
"--webserver_interface=" + localhost,
"--local_ip_for_outbound_sockets=" + localhost,
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/test/java/org/kududb/client/TestLeaderFailover.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestLeaderFailover.java b/java/kudu-client/src/test/java/org/kududb/client/TestLeaderFailover.java
index 08926f7..49ac502 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestLeaderFailover.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestLeaderFailover.java
@@ -46,7 +46,6 @@ public class TestLeaderFailover extends BaseKuduTest {
@Test(timeout = 100000)
public void testFailover() throws Exception {
KuduSession session = syncClient.newSession();
- session.setIgnoreAllDuplicateRows(true);
for (int i = 0; i < 3; i++) {
session.apply(createBasicSchemaInsert(table, i));
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/b57b02b9/java/kudu-client/src/test/java/org/kududb/client/TestRequestTracker.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestRequestTracker.java b/java/kudu-client/src/test/java/org/kududb/client/TestRequestTracker.java
new file mode 100644
index 0000000..7528de6
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestRequestTracker.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class TestRequestTracker {
+
+ @Test(timeout = 10000)
+ public void test() {
+ RequestTracker tracker = new RequestTracker("test");
+
+ // A new tracker should have no incomplete RPCs.
+ assertEquals(RequestTracker.NO_SEQ_NO, tracker.firstIncomplete());
+
+ int max = 10;
+
+ for (int i = 0; i < max; i++) {
+ tracker.newSeqNo();
+ }
+
+ // The first RPC is the incomplete one.
+ assertEquals(1, tracker.firstIncomplete());
+
+ // Mark the first as complete, incomplete should advance by 1.
+ tracker.rpcCompleted(1);
+ assertEquals(2, tracker.firstIncomplete());
+
+ // Mark the RPC in the middle as complete, first incomplete doesn't change.
+ tracker.rpcCompleted(5);
+ assertEquals(2, tracker.firstIncomplete());
+
+ // Mark the first half as complete.
+ // Note that we're also testing that rpcCompleted is idempotent.
+ for (int i = 1; i < max / 2; i++) {
+ tracker.rpcCompleted(i);
+ }
+
+ assertEquals(6, tracker.firstIncomplete());
+
+ // Get a few more sequence numbers.
+ long lastSeqNo = 0;
+ for (int i = max / 2; i <= max; i++) {
+ lastSeqNo = tracker.newSeqNo();
+ }
+
+ // Mark them all as complete except the last one.
+ while (tracker.firstIncomplete() != lastSeqNo) {
+ tracker.rpcCompleted(tracker.firstIncomplete());
+ }
+
+ assertEquals(lastSeqNo, tracker.firstIncomplete());
+ tracker.rpcCompleted(lastSeqNo);
+
+ // Test that we get back to NO_SEQ_NO after marking them all.
+ assertEquals(RequestTracker.NO_SEQ_NO, tracker.firstIncomplete());
+ }
+}