You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/10/27 23:09:09 UTC
[4/5] kudu git commit: KUDU-1715. Add a way to set ReplicaSelection
to the java client
KUDU-1715. Add a way to set ReplicaSelection to the java client
This patch adds a ReplicaSelection option for all the RPCs, which always defaults
to LEADER_ONLY, as well as a way to change it for scanners.
Change-Id: I3bb08c9c78271a0065c8aa8fb9b0f3301f84e828
Reviewed-on: http://gerrit.cloudera.org:8080/4837
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a7fd4fc8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a7fd4fc8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a7fd4fc8
Branch: refs/heads/master
Commit: a7fd4fc85c0878570a5ab51a8e28275fd4459ff5
Parents: a326fc9
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Tue Oct 25 09:51:42 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Oct 27 18:40:35 2016 +0000
----------------------------------------------------------------------
.../kudu/client/AbstractKuduScannerBuilder.java | 12 +++++++
.../org/apache/kudu/client/AsyncKuduClient.java | 17 +++++----
.../apache/kudu/client/AsyncKuduScanner.java | 21 +++++++-----
.../java/org/apache/kudu/client/KuduRpc.java | 4 +++
.../org/apache/kudu/client/KuduScanner.java | 2 +-
.../org/apache/kudu/client/RemoteTablet.java | 17 +++++++++
.../apache/kudu/client/ReplicaSelection.java | 36 ++++++++++++++++++++
.../apache/kudu/client/TestRemoteTablet.java | 8 +++++
.../kudu/client/TestScannerMultiTablet.java | 15 ++++++++
9 files changed, 115 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
index c23e07e..f58ae41 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
@@ -54,6 +54,7 @@ public abstract class AbstractKuduScannerBuilder
List<String> projectedColumnNames = null;
List<Integer> projectedColumnIndexes = null;
long scanRequestTimeout;
+ ReplicaSelection replicaSelection = ReplicaSelection.LEADER_ONLY;
AbstractKuduScannerBuilder(AsyncKuduClient client, KuduTable table) {
this.client = client;
@@ -313,6 +314,17 @@ public abstract class AbstractKuduScannerBuilder
}
/**
+ * Sets the replica selection mechanism for this scanner. The default is to read from the
+ * currently known leader.
+ * @param replicaSelection replication selection mechanism to use
+ * @return this instance
+ */
+ public S replicaSelection(ReplicaSelection replicaSelection) {
+ this.replicaSelection = replicaSelection;
+ return (S) this;
+ }
+
+ /**
* Set an encoded (inclusive) start partition key for the scan.
*
* @param partitionKey the encoded partition key
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index b36737b..35cd9d6 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -586,8 +586,9 @@ public class AsyncKuduClient implements AutoCloseable {
Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
RemoteTablet tablet = scanner.currentTablet();
assert (tablet != null);
- TabletClient client = connectionCache.getClient(tablet.getLeaderUUID());
KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
+ TabletClient client =
+ connectionCache.getClient(tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection()));
Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
// Important to increment the attempts before the next if statement since
// getSleepTimeForRpc() relies on it if the client is null or dead.
@@ -615,7 +616,9 @@ public class AsyncKuduClient implements AutoCloseable {
return Deferred.fromResult(null);
}
- final TabletClient client = connectionCache.getClient(tablet.getLeaderUUID());
+ final KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest();
+ final TabletClient client = connectionCache.getClient(
+ tablet.getReplicaSelectedUUID(closeRequest.getReplicaSelection()));
if (client == null || !client.isAlive()) {
// Oops, we couldn't find a tablet server that hosts this tablet. Our
// cache was probably invalidated while the client was scanning. So
@@ -623,10 +626,10 @@ public class AsyncKuduClient implements AutoCloseable {
LOG.warn("Cannot close {} properly, no connection open for {}", scanner, tablet);
return Deferred.fromResult(null);
}
- final KuduRpc<AsyncKuduScanner.Response> close_request = scanner.getCloseRequest();
- final Deferred<AsyncKuduScanner.Response> d = close_request.getDeferred();
- close_request.attempt++;
- client.sendRpc(close_request);
+
+ final Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred();
+ closeRequest.attempt++;
+ client.sendRpc(closeRequest);
return d;
}
@@ -670,7 +673,7 @@ public class AsyncKuduClient implements AutoCloseable {
// If we found a tablet, we'll try to find the TS to talk to.
if (entry != null) {
RemoteTablet tablet = entry.getTablet();
- String uuid = tablet.getLeaderUUID();
+ String uuid = tablet.getReplicaSelectedUUID(request.getReplicaSelection());
if (uuid != null) {
Deferred<R> d = request.getDeferred();
request.setTablet(tablet);
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index c61fbdb..b51394a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -178,6 +178,8 @@ public final class AsyncKuduScanner {
private final long htTimestamp;
+ private final ReplicaSelection replicaSelection;
+
/////////////////////
// Runtime variables.
/////////////////////
@@ -212,8 +214,6 @@ public final class AsyncKuduScanner {
private Deferred<RowResultIterator> prefetcherDeferred;
- private boolean inFirstTablet = true;
-
final long scanRequestTimeout;
AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> projectedNames,
@@ -222,7 +222,8 @@ public final class AsyncKuduScanner {
Map<String, KuduPredicate> predicates, long limit,
boolean cacheBlocks, boolean prefetching,
byte[] startPrimaryKey, byte[] endPrimaryKey,
- long htTimestamp, int batchSizeBytes, PartitionPruner pruner) {
+ long htTimestamp, int batchSizeBytes, PartitionPruner pruner,
+ ReplicaSelection replicaSelection) {
checkArgument(batchSizeBytes > 0, "Need a strictly positive number of bytes, " +
"got %s", batchSizeBytes);
checkArgument(limit > 0, "Need a strictly positive number for the limit, " +
@@ -280,6 +281,8 @@ public final class AsyncKuduScanner {
this.hasMore = false;
this.closed = true;
}
+
+ this.replicaSelection = replicaSelection;
}
/**
@@ -586,11 +589,6 @@ public final class AsyncKuduScanner {
*/
KuduRpc<Response> getOpenRequest() {
checkScanningNotStarted();
- // This is the only point where we know we haven't started scanning and where the scanner
- // should be fully configured
- if (this.inFirstTablet) {
- this.inFirstTablet = false;
- }
return new ScanRequest(table, State.OPENING);
}
@@ -684,6 +682,11 @@ public final class AsyncKuduScanner {
}
}
+ @Override
+ ReplicaSelection getReplicaSelection() {
+ return replicaSelection;
+ }
+
/** Serializes this request. */
ChannelBuffer serialize(Message header) {
final ScanRequestPB.Builder builder = ScanRequestPB.newBuilder();
@@ -818,7 +821,7 @@ public final class AsyncKuduScanner {
client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
scanRequestTimeout, predicates, limit, cacheBlocks,
prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
- htTimestamp, batchSizeBytes, PartitionPruner.create(this));
+ htTimestamp, batchSizeBytes, PartitionPruner.create(this), replicaSelection);
}
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 02b547e..8b5c0df 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -262,6 +262,10 @@ public abstract class KuduRpc<R> {
return sequenceId;
}
+ ReplicaSelection getReplicaSelection() {
+ return ReplicaSelection.LEADER_ONLY;
+ }
+
void setSequenceId(long sequenceId) {
assert (this.sequenceId == RequestTracker.NO_SEQ_NO);
this.sequenceId = sequenceId;
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 4a4bcc1..1cccd10 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -142,7 +142,7 @@ public class KuduScanner {
client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
scanRequestTimeout, predicates, limit, cacheBlocks,
prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
- htTimestamp, batchSizeBytes, PartitionPruner.create(this)));
+ htTimestamp, batchSizeBytes, PartitionPruner.create(this), replicaSelection));
}
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index 28be0fe..d98f21d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -165,6 +165,23 @@ class RemoteTablet implements Comparable<RemoteTablet> {
}
/**
+ * Helper function to centralize the calling of methods based on the passed replica selection
+ * mechanism.
+ * @param replicaSelection replica selection mechanism to use
+ * @return a UUID for the server that matches the selection, can be null
+ */
+ String getReplicaSelectedUUID(ReplicaSelection replicaSelection) {
+ switch (replicaSelection) {
+ case LEADER_ONLY:
+ return getLeaderUUID();
+ case CLOSEST_REPLICA:
+ return getClosestUUID();
+ default:
+ throw new RuntimeException("Unknown replica selection mechanism " + replicaSelection);
+ }
+ }
+
+ /**
* Gets the replicas of this tablet. The returned list may not be mutated.
* @return the replicas of the tablet
*/
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java
new file mode 100644
index 0000000..e33ba34
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.annotations.InterfaceStability;
+
+/**
+ * Policy with which to choose amongst multiple replicas.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public enum ReplicaSelection {
+ /**
+ * Select the LEADER replica.
+ */
+ LEADER_ONLY,
+ /**
+ * Select the closest replica to the client, or a random one if all replicas are equidistant.
+ */
+ CLOSEST_REPLICA
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
index aa50ad6..ea77c27 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
@@ -101,6 +101,14 @@ public class TestRemoteTablet {
assertNotNull(tablet.getClosestUUID());
}
+ @Test
+ public void testReplicaSelection() {
+ RemoteTablet tablet = getTablet(0, 1);
+
+ assertEquals("0", tablet.getReplicaSelectedUUID(ReplicaSelection.LEADER_ONLY));
+ assertEquals("1", tablet.getReplicaSelectedUUID(ReplicaSelection.CLOSEST_REPLICA));
+ }
+
private RemoteTablet getTablet(int leaderIndex) {
return getTablet(leaderIndex, -1);
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
index 71739d4..14454c0 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
@@ -176,6 +176,21 @@ public class TestScannerMultiTablet extends BaseKuduTest {
buildScannerAndCheckColumnsCount(builder, 2);
}
+ @Test(timeout = 100000)
+ public void testReplicaSelections() throws Exception {
+ AsyncKuduScanner scanner = client.newScannerBuilder(table)
+ .replicaSelection(ReplicaSelection.LEADER_ONLY)
+ .build();
+
+ assertEquals(9, countRowsInScan(scanner));
+
+ scanner = client.newScannerBuilder(table)
+ .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
+ .build();
+
+ assertEquals(9, countRowsInScan(scanner));
+ }
+
private AsyncKuduScanner getScanner(String lowerBoundKeyOne,
String lowerBoundKeyTwo,
String exclusiveUpperBoundKeyOne,