You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/01/10 23:05:20 UTC

[kudu] 04/09: Support location awareness in READ_CLOSEST for the Java client

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 7274710bb9210ae812619545540ccf08b052d3c8
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Wed Dec 26 16:16:53 2018 -0500

    Support location awareness in READ_CLOSEST for the Java client
    
    Change-Id: Ief0f07058cefd0037f4b0f7c60c8b7809dc8313f
    Reviewed-on: http://gerrit.cloudera.org:8080/12175
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <gr...@apache.org>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    |  14 ++-
 .../java/org/apache/kudu/client/RemoteTablet.java  |  41 +++++++--
 .../org/apache/kudu/client/ReplicaSelection.java   |   6 +-
 .../java/org/apache/kudu/client/ServerInfo.java    |  14 ++-
 .../apache/kudu/client/ITScannerMultiTablet.java   |   2 +-
 .../org/apache/kudu/client/TestRemoteTablet.java   | 102 +++++++++++++++++----
 .../kudu/client/TestTableLocationsCache.java       |   2 +-
 7 files changed, 143 insertions(+), 38 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index e71293b..4ed25f6 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -515,7 +515,7 @@ public class AsyncKuduClient implements AutoCloseable {
   /**
    * Returns a string representation of this client's location. If this
    * client was not assigned a location, returns the empty string.
-   * 
+   *
    * @return a string representation of this client's location
    */
   public String getLocationString() {
@@ -1111,7 +1111,8 @@ public class AsyncKuduClient implements AutoCloseable {
     // Important to increment the attempts before the next if statement since
     // getSleepTimeForRpc() relies on it if the client is null or dead.
     nextRequest.attempt++;
-    final ServerInfo info = tablet.getReplicaSelectedServerInfo(nextRequest.getReplicaSelection());
+    final ServerInfo info = tablet.getReplicaSelectedServerInfo(nextRequest.getReplicaSelection(),
+                                                                location);
     if (info == null) {
       return delayedSendRpcToTablet(nextRequest, new RecoverableException(Status.RemoteError(
           String.format("No information on servers hosting tablet %s, will retry later",
@@ -1137,7 +1138,8 @@ public class AsyncKuduClient implements AutoCloseable {
       return Deferred.fromResult(null);
     }
     final KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest();
-    final ServerInfo info = tablet.getReplicaSelectedServerInfo(closeRequest.getReplicaSelection());
+    final ServerInfo info = tablet.getReplicaSelectedServerInfo(closeRequest.getReplicaSelection(),
+                                                                location);
     if (info == null) {
       return Deferred.fromResult(null);
     }
@@ -1165,7 +1167,8 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     final KuduRpc<Void> keepAliveRequest = scanner.getKeepAliveRequest();
-    final ServerInfo info = tablet.getReplicaSelectedServerInfo(keepAliveRequest.getReplicaSelection());
+    final ServerInfo info = tablet.getReplicaSelectedServerInfo(keepAliveRequest.getReplicaSelection(),
+                                                                location);
     if (info == null) {
       return Deferred.fromResult(null);
     }
@@ -1218,7 +1221,8 @@ public class AsyncKuduClient implements AutoCloseable {
     // If we found a tablet, we'll try to find the TS to talk to.
     if (entry != null) {
       RemoteTablet tablet = entry.getTablet();
-      ServerInfo info = tablet.getReplicaSelectedServerInfo(request.getReplicaSelection());
+      ServerInfo info = tablet.getReplicaSelectedServerInfo(request.getReplicaSelection(),
+                                                            location);
       if (info != null) {
         Deferred<R> d = request.getDeferred();
         request.setTablet(tablet);
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index ed8c5cc..9d77275 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -172,25 +172,45 @@ public class RemoteTablet implements Comparable<RemoteTablet> {
   }
 
   /**
-   * Get the information on the closest server. If none is closer than the others,
-   * return the information on a randomly picked server.
+   * Get the information on the closest server. Servers are ranked from closest to furthest as
+   * follows:
+   * - Local servers
+   * - Servers in the same location as the client
+   * - All other servers
    *
-   * @return the information on the closest server, which might be any if none is closer, or null
-   *   if this cache doesn't know any servers.
+   * @param location the location of the client
+   * @return the information for a closest server, or null if this cache doesn't know any servers.
    */
   @Nullable
-  ServerInfo getClosestServerInfo() {
+  ServerInfo getClosestServerInfo(String location) {
+    // TODO(KUDU-2348) this doesn't return a random server, but rather returns
+    // 1. whichever local server's hashcode places it first among local servers,
+    //    if there is a local server, or
+    // 2. whichever server in the same location has a hashcode that places it
+    //    first among servers in the same location, if there is a server in the
+    //    same location, or, finally,
+    // 3. whichever server's hashcode places it last.
+    // That might be the same "random" choice across all clients, which is not
+    // so good. Unfortunately, the client depends on this method returning the
+    // same tablet server given the same state. See
+    // testGetReplicaSelectedServerInfoDeterminism in TestRemoteTablet.java.
+    // TODO(wdberkeley): Eventually, the client might use the hierarchical
+    // structure of a location to determine proximity.
     synchronized (tabletServers) {
       ServerInfo last = null;
+      ServerInfo lastInSameLocation = null;
       for (ServerInfo e : tabletServers.values()) {
         last = e;
         if (e.isLocal()) {
           return e;
         }
+        if (e.inSameLocation(location)) {
+          lastInSameLocation = e;
+        }
+      }
+      if (lastInSameLocation != null) {
+        return lastInSameLocation;
       }
-      // TODO(KUDU-2348) this doesn't return a random server, but rather returns
-      // whichever one's hashcode places it last. That might be the same
-      // "random" choice across all clients, which is not so good.
       return last;
     }
   }
@@ -200,15 +220,16 @@ public class RemoteTablet implements Comparable<RemoteTablet> {
    * mechanism.
    *
    * @param replicaSelection replica selection mechanism to use
+   * @param location the location of the client
    * @return information on the server that matches the selection, can be null
    */
   @Nullable
-  ServerInfo getReplicaSelectedServerInfo(ReplicaSelection replicaSelection) {
+  ServerInfo getReplicaSelectedServerInfo(ReplicaSelection replicaSelection, String location) {
     switch (replicaSelection) {
       case LEADER_ONLY:
         return getLeaderServerInfo();
       case CLOSEST_REPLICA:
-        return getClosestServerInfo();
+        return getClosestServerInfo(location);
       default:
         throw new RuntimeException("unknown replica selection mechanism " + replicaSelection);
     }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java
index 8fc0ebf..def81d3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ReplicaSelection.java
@@ -31,7 +31,11 @@ public enum ReplicaSelection {
    */
   LEADER_ONLY,
   /**
-   * Select the closest replica to the client, or a random one if all replicas are equidistant.
+   * Select the closest replica to the client. Replicas are classified from closest to furthest as
+   * follows:
+   * - Local replicas
+   * - Replicas whose tablet server has the same location as the client
+   * - All other replicas
    */
   CLOSEST_REPLICA
 }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
index 67b2963..cad4b21 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
@@ -94,14 +94,24 @@ public class ServerInfo {
   }
 
   /**
-   * Returns this server's location.
-   * @return the server's location, or the empty string if no location was assigned.
+   * Returns this server's location. If no location is assigned, returns an empty string.
+   * @return the server's location
    */
   public String getLocation() {
     return location;
   }
 
   /**
+   * Returns true if the server is in the same location as 'location'.
+   * @return true if the server is in 'location'.
+   */
+  public boolean inSameLocation(String loc) {
+    Preconditions.checkNotNull(loc);
+    return !loc.isEmpty() &&
+           loc.equals(location);
+  }
+
+  /**
    * Returns if this server is on this client's host.
    * @return true if the server is local, else false
    */
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
index 2a5c0fc..58ed5f3 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
@@ -177,7 +177,7 @@ public class ITScannerMultiTablet {
       // Forcefully disconnects the current connection and fails all outstanding RPCs
       // in the middle of scanning.
       harness.getAsyncClient().newRpcProxy(scanner.currentTablet().getReplicaSelectedServerInfo(
-          scanner.getReplicaSelection())).getConnection().disconnect();
+          scanner.getReplicaSelection(), /* location= */"")).getConnection().disconnect();
 
       while (scanner.hasMoreRows()) {
         loopCount++;
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
index f1c09ab..368ecbc 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
@@ -35,6 +35,9 @@ import org.apache.kudu.consensus.Metadata;
 import org.apache.kudu.master.Master;
 
 public class TestRemoteTablet {
+  private static final String kClientLocation = "/fake-client";
+  private static final String kLocation = "/fake-noclient";
+  private static final String kNoLocation = "";
   private static final String[] kUuids = { "uuid-0", "uuid-1", "uuid-2" };
 
   @Test
@@ -97,27 +100,77 @@ public class TestRemoteTablet {
 
   @Test
   public void testLocalReplica() {
-    RemoteTablet tablet = getTablet(0, 0);
+    {
+      // Tablet with no replicas in the same location as the client.
+      RemoteTablet tablet = getTablet(0, 0, -1);
 
-    assertEquals(kUuids[0], tablet.getClosestServerInfo().getUuid());
+      // No location for the client.
+      assertEquals(kUuids[0], tablet.getClosestServerInfo(kNoLocation).getUuid());
+
+      // Client with location.
+      assertEquals(kUuids[0], tablet.getClosestServerInfo(kClientLocation).getUuid());
+    }
+
+    {
+      // Tablet with a non-local replica in the same location as the client.
+      RemoteTablet tablet = getTablet(0, 0, 1);
+
+      // No location for the client.
+      assertEquals(kUuids[0], tablet.getClosestServerInfo(kNoLocation).getUuid());
+
+      // Client with location. The local replica should be chosen.
+      assertEquals(kUuids[0], tablet.getClosestServerInfo(kClientLocation).getUuid());
+    }
+
+    {
+      // Tablet with a local replica in the same location as the client.
+      RemoteTablet tablet = getTablet(0, 0, 0);
+
+      // No location for the client.
+      assertEquals(kUuids[0], tablet.getClosestServerInfo(kNoLocation).getUuid());
+
+      // Client with location. The local replica should be chosen.
+      assertEquals(kUuids[0], tablet.getClosestServerInfo(kClientLocation).getUuid());
+    }
   }
 
   @Test
-  public void testNoLocalReplica() {
-    RemoteTablet tablet = getTablet(0, -1);
+  public void testNoLocalOrSameLocationReplica() {
+    RemoteTablet tablet = getTablet(0, -1, -1);
 
     // We just care about getting one back.
-    assertNotNull(tablet.getClosestServerInfo().getUuid());
+    assertNotNull(tablet.getClosestServerInfo(kClientLocation).getUuid());
   }
 
   @Test
   public void testReplicaSelection() {
-    RemoteTablet tablet = getTablet(0, 1);
+    {
+      RemoteTablet tablet = getTablet(0, 1, 2);
+
+      // LEADER_ONLY picks the leader even if there's a local replica.
+      assertEquals(kUuids[0],
+          tablet.getReplicaSelectedServerInfo(ReplicaSelection.LEADER_ONLY, kClientLocation)
+              .getUuid());
+
+      // CLOSEST_REPLICA picks the local replica even if there's a replica in the same location.
+      assertEquals(kUuids[1],
+          tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA, kClientLocation)
+              .getUuid());
+    }
+
+    {
+      RemoteTablet tablet = getTablet(0, -1, 1);
 
-    assertEquals(kUuids[0],
-        tablet.getReplicaSelectedServerInfo(ReplicaSelection.LEADER_ONLY).getUuid());
-    assertEquals(kUuids[1],
-        tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid());
+      // LEADER_ONLY picks the leader even if there's a replica with the same location.
+      assertEquals(kUuids[0],
+          tablet.getReplicaSelectedServerInfo(ReplicaSelection.LEADER_ONLY, kClientLocation)
+              .getUuid());
+
+      // CLOSEST_REPLICA picks the replica in the same location.
+      assertEquals(kUuids[1],
+          tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA, kClientLocation)
+              .getUuid());
+    }
   }
 
   // AsyncKuduClient has methods like scanNextRows, keepAlive, and closeScanner that rely on
@@ -126,33 +179,45 @@ public class TestRemoteTablet {
   // This test ensures that remains true.
   @Test
   public void testGetReplicaSelectedServerInfoDeterminism() {
-    RemoteTablet tabletWithLocal = getTablet(0, 0);
+    // There's a local leader replica.
+    RemoteTablet tabletWithLocal = getTablet(0, 0, 0);
     verifyGetReplicaSelectedServerInfoDeterminism(tabletWithLocal);
 
-    RemoteTablet tabletWithRemote = getTablet(0, -1);
+    // There's a leader in the same location as the client.
+    RemoteTablet tabletWithSameLocation = getTablet(0, -1, 0);
+    verifyGetReplicaSelectedServerInfoDeterminism(tabletWithSameLocation);
+
+    // There's no local replica or replica in the same location.
+    RemoteTablet tabletWithRemote = getTablet(0, -1, -1);
     verifyGetReplicaSelectedServerInfoDeterminism(tabletWithRemote);
   }
 
   private void verifyGetReplicaSelectedServerInfoDeterminism(RemoteTablet tablet) {
-    String init = tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid();
+    String init = tablet
+        .getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA, kClientLocation)
+        .getUuid();
     for (int i = 0; i < 10; i++) {
-      String next = tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid();
+      String next = tablet
+          .getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA, kClientLocation)
+          .getUuid();
       assertEquals("getReplicaSelectedServerInfo was not deterministic", init, next);
     }
   }
 
   @Test
   public void testToString() {
-    RemoteTablet tablet = getTablet(0, 1);
+    RemoteTablet tablet = getTablet(0, 1, -1);
     assertEquals("fake tablet@[uuid-0(host:1000)[L],uuid-1(host:1001),uuid-2(host:1002)]",
         tablet.toString());
   }
 
   private RemoteTablet getTablet(int leaderIndex) {
-    return getTablet(leaderIndex, -1);
+    return getTablet(leaderIndex, -1, -1);
   }
 
-  static RemoteTablet getTablet(int leaderIndex, int localReplicaIndex) {
+  static RemoteTablet getTablet(int leaderIndex,
+                                int localReplicaIndex,
+                                int sameLocationReplicaIndex) {
     Master.TabletLocationsPB.Builder tabletPb = Master.TabletLocationsPB.newBuilder();
 
     tabletPb.setPartition(ProtobufUtils.getFakePartitionPB());
@@ -171,10 +236,11 @@ public class TestRemoteTablet {
       }
 
       String uuid = kUuids[i];
+      String location = i == sameLocationReplicaIndex ? kClientLocation : kLocation;
       servers.add(new ServerInfo(uuid,
                                  new HostAndPort("host", 1000 + i),
                                  addr,
-                                 /*location=*/""));
+                                 location));
       tabletPb.addReplicas(ProtobufUtils.getFakeTabletReplicaPB(
           uuid, "host", i,
           leaderIndex == i ? Metadata.RaftPeerPB.Role.LEADER : Metadata.RaftPeerPB.Role.FOLLOWER));
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTableLocationsCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTableLocationsCache.java
index d61fb8e..c9de4d3 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTableLocationsCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTableLocationsCache.java
@@ -45,7 +45,7 @@ public class TestTableLocationsCache {
 
   @Test
   public void testToString() {
-    RemoteTablet tablet = TestRemoteTablet.getTablet(0, 1);
+    RemoteTablet tablet = TestRemoteTablet.getTablet(0, 1, -1);
     List<RemoteTablet> tablets = ImmutableList.of(tablet);
     cache.cacheTabletLocations(tablets,
         tablet.getPartition().getPartitionKeyStart(),