You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/10/27 23:09:09 UTC

[4/5] kudu git commit: KUDU-1715. Add a way to set ReplicaSelection to the java client

KUDU-1715. Add a way to set ReplicaSelection to the java client

This patch adds a ReplicaSelection option for all the RPCs, which always defaults
to LEADER_ONLY, as well as a way to change it for scanners.

Change-Id: I3bb08c9c78271a0065c8aa8fb9b0f3301f84e828
Reviewed-on: http://gerrit.cloudera.org:8080/4837
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a7fd4fc8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a7fd4fc8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a7fd4fc8

Branch: refs/heads/master
Commit: a7fd4fc85c0878570a5ab51a8e28275fd4459ff5
Parents: a326fc9
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Tue Oct 25 09:51:42 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Oct 27 18:40:35 2016 +0000

----------------------------------------------------------------------
 .../kudu/client/AbstractKuduScannerBuilder.java | 12 +++++++
 .../org/apache/kudu/client/AsyncKuduClient.java | 17 +++++----
 .../apache/kudu/client/AsyncKuduScanner.java    | 21 +++++++-----
 .../java/org/apache/kudu/client/KuduRpc.java    |  4 +++
 .../org/apache/kudu/client/KuduScanner.java     |  2 +-
 .../org/apache/kudu/client/RemoteTablet.java    | 17 +++++++++
 .../apache/kudu/client/ReplicaSelection.java    | 36 ++++++++++++++++++++
 .../apache/kudu/client/TestRemoteTablet.java    |  8 +++++
 .../kudu/client/TestScannerMultiTablet.java     | 15 ++++++++
 9 files changed, 115 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
index c23e07e..f58ae41 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
@@ -54,6 +54,7 @@ public abstract class AbstractKuduScannerBuilder
   List<String> projectedColumnNames = null;
   List<Integer> projectedColumnIndexes = null;
   long scanRequestTimeout;
+  ReplicaSelection replicaSelection = ReplicaSelection.LEADER_ONLY;
 
   AbstractKuduScannerBuilder(AsyncKuduClient client, KuduTable table) {
     this.client = client;
@@ -313,6 +314,17 @@ public abstract class AbstractKuduScannerBuilder
   }
 
   /**
+   * Sets the replica selection mechanism for this scanner. The default is to read from the
+   * currently known leader.
+   * @param replicaSelection replication selection mechanism to use
+   * @return this instance
+   */
+  public S replicaSelection(ReplicaSelection replicaSelection) {
+    this.replicaSelection = replicaSelection;
+    return (S) this;
+  }
+
+  /**
    * Set an encoded (inclusive) start partition key for the scan.
    *
    * @param partitionKey the encoded partition key

http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index b36737b..35cd9d6 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -586,8 +586,9 @@ public class AsyncKuduClient implements AutoCloseable {
   Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
     RemoteTablet tablet = scanner.currentTablet();
     assert (tablet != null);
-    TabletClient client = connectionCache.getClient(tablet.getLeaderUUID());
     KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
+    TabletClient client =
+        connectionCache.getClient(tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection()));
     Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
     // Important to increment the attempts before the next if statement since
     // getSleepTimeForRpc() relies on it if the client is null or dead.
@@ -615,7 +616,9 @@ public class AsyncKuduClient implements AutoCloseable {
       return Deferred.fromResult(null);
     }
 
-    final TabletClient client = connectionCache.getClient(tablet.getLeaderUUID());
+    final KuduRpc<AsyncKuduScanner.Response>  closeRequest = scanner.getCloseRequest();
+    final TabletClient client = connectionCache.getClient(
+        tablet.getReplicaSelectedUUID(closeRequest.getReplicaSelection()));
     if (client == null || !client.isAlive()) {
       // Oops, we couldn't find a tablet server that hosts this tablet. Our
       // cache was probably invalidated while the client was scanning. So
@@ -623,10 +626,10 @@ public class AsyncKuduClient implements AutoCloseable {
       LOG.warn("Cannot close {} properly, no connection open for {}", scanner, tablet);
       return Deferred.fromResult(null);
     }
-    final KuduRpc<AsyncKuduScanner.Response>  close_request = scanner.getCloseRequest();
-    final Deferred<AsyncKuduScanner.Response> d = close_request.getDeferred();
-    close_request.attempt++;
-    client.sendRpc(close_request);
+
+    final Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred();
+    closeRequest.attempt++;
+    client.sendRpc(closeRequest);
     return d;
   }
 
@@ -670,7 +673,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // If we found a tablet, we'll try to find the TS to talk to.
     if (entry != null) {
       RemoteTablet tablet = entry.getTablet();
-      String uuid = tablet.getLeaderUUID();
+      String uuid = tablet.getReplicaSelectedUUID(request.getReplicaSelection());
       if (uuid != null) {
         Deferred<R> d = request.getDeferred();
         request.setTablet(tablet);

http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index c61fbdb..b51394a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -178,6 +178,8 @@ public final class AsyncKuduScanner {
 
   private final long htTimestamp;
 
+  private final ReplicaSelection replicaSelection;
+
   /////////////////////
   // Runtime variables.
   /////////////////////
@@ -212,8 +214,6 @@ public final class AsyncKuduScanner {
 
   private Deferred<RowResultIterator> prefetcherDeferred;
 
-  private boolean inFirstTablet = true;
-
   final long scanRequestTimeout;
 
   AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String> projectedNames,
@@ -222,7 +222,8 @@ public final class AsyncKuduScanner {
                    Map<String, KuduPredicate> predicates, long limit,
                    boolean cacheBlocks, boolean prefetching,
                    byte[] startPrimaryKey, byte[] endPrimaryKey,
-                   long htTimestamp, int batchSizeBytes, PartitionPruner pruner) {
+                   long htTimestamp, int batchSizeBytes, PartitionPruner pruner,
+                   ReplicaSelection replicaSelection) {
     checkArgument(batchSizeBytes > 0, "Need a strictly positive number of bytes, " +
         "got %s", batchSizeBytes);
     checkArgument(limit > 0, "Need a strictly positive number for the limit, " +
@@ -280,6 +281,8 @@ public final class AsyncKuduScanner {
       this.hasMore = false;
       this.closed = true;
     }
+
+    this.replicaSelection = replicaSelection;
   }
 
   /**
@@ -586,11 +589,6 @@ public final class AsyncKuduScanner {
    */
   KuduRpc<Response> getOpenRequest() {
     checkScanningNotStarted();
-    // This is the only point where we know we haven't started scanning and where the scanner
-    // should be fully configured
-    if (this.inFirstTablet) {
-      this.inFirstTablet = false;
-    }
     return new ScanRequest(table, State.OPENING);
   }
 
@@ -684,6 +682,11 @@ public final class AsyncKuduScanner {
       }
     }
 
+    @Override
+    ReplicaSelection getReplicaSelection() {
+      return replicaSelection;
+    }
+
     /** Serializes this request.  */
     ChannelBuffer serialize(Message header) {
       final ScanRequestPB.Builder builder = ScanRequestPB.newBuilder();
@@ -818,7 +821,7 @@ public final class AsyncKuduScanner {
           client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
           scanRequestTimeout, predicates, limit, cacheBlocks,
           prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
-          htTimestamp, batchSizeBytes, PartitionPruner.create(this));
+          htTimestamp, batchSizeBytes, PartitionPruner.create(this), replicaSelection);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 02b547e..8b5c0df 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -262,6 +262,10 @@ public abstract class KuduRpc<R> {
     return sequenceId;
   }
 
+  ReplicaSelection getReplicaSelection() {
+    return ReplicaSelection.LEADER_ONLY;
+  }
+
   void setSequenceId(long sequenceId) {
     assert (this.sequenceId == RequestTracker.NO_SEQ_NO);
     this.sequenceId = sequenceId;

http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 4a4bcc1..1cccd10 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -142,7 +142,7 @@ public class KuduScanner {
           client, table, projectedColumnNames, projectedColumnIndexes, readMode, orderMode,
           scanRequestTimeout, predicates, limit, cacheBlocks,
           prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
-          htTimestamp, batchSizeBytes, PartitionPruner.create(this)));
+          htTimestamp, batchSizeBytes, PartitionPruner.create(this), replicaSelection));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index 28be0fe..d98f21d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -165,6 +165,23 @@ class RemoteTablet implements Comparable<RemoteTablet> {
   }
 
   /**
+   * Helper function to centralize the calling of methods based on the passed replica selection
+   * mechanism.
+   * @param replicaSelection replica selection mechanism to use
+   * @return a UUID for the server that matches the selection, can be null
+   */
+  String getReplicaSelectedUUID(ReplicaSelection replicaSelection) {
+    switch (replicaSelection) {
+      case LEADER_ONLY:
+        return getLeaderUUID();
+      case CLOSEST_REPLICA:
+        return getClosestUUID();
+      default:
+        throw new RuntimeException("Unknown replica selection mechanism " + replicaSelection);
+    }
+  }
+
+  /**
    * Gets the replicas of this tablet. The returned list may not be mutated.
    * @return the replicas of the tablet
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java
new file mode 100644
index 0000000..e33ba34
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.annotations.InterfaceStability;
+
+/**
+ * Policy with which to choose amongst multiple replicas.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public enum ReplicaSelection {
+  /**
+   * Select the LEADER replica.
+   */
+  LEADER_ONLY,
+  /**
+   * Select the closest replica to the client, or a random one if all replicas are equidistant.
+   */
+  CLOSEST_REPLICA
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
index aa50ad6..ea77c27 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
@@ -101,6 +101,14 @@ public class TestRemoteTablet {
     assertNotNull(tablet.getClosestUUID());
   }
 
+  @Test
+  public void testReplicaSelection() {
+    RemoteTablet tablet = getTablet(0, 1);
+
+    assertEquals("0", tablet.getReplicaSelectedUUID(ReplicaSelection.LEADER_ONLY));
+    assertEquals("1", tablet.getReplicaSelectedUUID(ReplicaSelection.CLOSEST_REPLICA));
+  }
+
   private RemoteTablet getTablet(int leaderIndex) {
     return getTablet(leaderIndex, -1);
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/a7fd4fc8/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
index 71739d4..14454c0 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScannerMultiTablet.java
@@ -176,6 +176,21 @@ public class TestScannerMultiTablet extends BaseKuduTest {
     buildScannerAndCheckColumnsCount(builder, 2);
   }
 
+  @Test(timeout = 100000)
+  public void testReplicaSelections() throws Exception {
+    AsyncKuduScanner scanner = client.newScannerBuilder(table)
+        .replicaSelection(ReplicaSelection.LEADER_ONLY)
+        .build();
+
+    assertEquals(9, countRowsInScan(scanner));
+
+    scanner = client.newScannerBuilder(table)
+        .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
+        .build();
+
+    assertEquals(9, countRowsInScan(scanner));
+  }
+
   private AsyncKuduScanner getScanner(String lowerBoundKeyOne,
                                       String lowerBoundKeyTwo,
                                       String exclusiveUpperBoundKeyOne,