You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/10/24 23:18:17 UTC

kudu git commit: [java client] Identify client-local TabletClients

Repository: kudu
Updated Branches:
  refs/heads/master 1a8ce4269 -> ff24651cb


[java client] Identify client-local TabletClients

This patch adds building blocks to enable selecting the closest replica. Another
patch will be required to finish the plumbing job.

Inspired by Hadoop: https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java#L698

Change-Id: Ia1bfcbc6b6aea7cb9610e8ae934bf08ab0774ee3
Reviewed-on: http://gerrit.cloudera.org:8080/4786
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ff24651c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ff24651c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ff24651c

Branch: refs/heads/master
Commit: ff24651cb9cb7f78ee2806b8624e61fed132b516
Parents: 1a8ce42
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Fri Oct 21 15:30:39 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Mon Oct 24 23:16:54 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  7 +--
 .../org/apache/kudu/client/ConnectionCache.java | 52 ++++++-------------
 .../org/apache/kudu/client/TabletClient.java    | 13 ++++-
 .../main/java/org/apache/kudu/util/NetUtil.java | 54 ++++++++++++++++++++
 .../apache/kudu/client/TestConnectionCache.java |  2 +-
 .../java/org/apache/kudu/util/TestNetUtil.java  | 12 ++++-
 6 files changed, 97 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index f455c0e..a781f9f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -53,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1405,8 +1406,8 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return A live and initialized client for the specified master server.
    */
   TabletClient newMasterClient(HostAndPort masterHostPort) {
-    String ip = ConnectionCache.getIP(masterHostPort.getHostText());
-    if (ip == null) {
+    InetAddress inetAddress = NetUtil.getInetAddress((masterHostPort.getHostText()));
+    if (inetAddress == null) {
       return null;
     }
     // We should pass a UUID here but we have a chicken and egg problem, we first need to
@@ -1415,7 +1416,7 @@ public class AsyncKuduClient implements AutoCloseable {
     // host and port which is enough to identify the node we're connecting to.
     return connectionCache.newClient(
         MASTER_TABLE_NAME_PLACEHOLDER + " - " + masterHostPort.toString(),
-        ip, masterHostPort.getPort());
+        inetAddress, masterHostPort.getPort());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 9a885fb..ed6100e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -23,6 +23,7 @@ import org.apache.kudu.Common;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
 import org.apache.kudu.master.Master;
+import org.apache.kudu.util.NetUtil;
 import org.jboss.netty.channel.DefaultChannelPipeline;
 import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.socket.SocketChannelConfig;
@@ -104,15 +105,21 @@ class ConnectionCache {
     // from meta_cache.cc
     // TODO: if the TS advertises multiple host/ports, pick the right one
     // based on some kind of policy. For now just use the first always.
-    String ip = getIP(addresses.get(0).getHost());
-    if (ip == null) {
+    InetAddress inetAddress = NetUtil.getInetAddress(addresses.get(0).getHost());
+    if (inetAddress == null) {
       throw new UnknownHostException(
           "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
     }
-    newClient(uuid, ip, addresses.get(0).getPort());
+    newClient(uuid, inetAddress, addresses.get(0).getPort());
   }
 
-  TabletClient newClient(String uuid, final String host, final int port) {
+  TabletClient newClient(String uuid, InetAddress inetAddress, int port) {
+    String host = inetAddress.getHostAddress();
+    boolean isLocal = NetUtil.isLocalAddress(inetAddress);
+    return newClient(uuid, host, port, isLocal);
+  }
+
+  TabletClient newClient(String uuid, String host, int port, boolean isLocal) {
     TabletClient client;
     SocketChannel chan;
 
@@ -123,7 +130,7 @@ class ConnectionCache {
         return client;
       }
       final TabletClientPipeline pipeline = new TabletClientPipeline();
-      client = pipeline.init(uuid, host, port);
+      client = pipeline.init(uuid, host, port, isLocal);
       chan = this.kuduClient.getChannelFactory().newChannel(pipeline);
       uuid2client.put(uuid, client);
     } finally {
@@ -159,7 +166,7 @@ class ConnectionCache {
 
   /**
    * Get a connection to a server for the given UUID. This method will automatically call
-   * {@link #newClient(String, String, int)} if the cached connection is down.
+   * {@link #newClient(String, InetAddress, int)} if the cached connection is down.
    * @param uuid server's identifier
    * @return a connection to a server, or null if the passed UUID isn't known
    */
@@ -171,7 +178,7 @@ class ConnectionCache {
     } else if (client.isAlive()) {
       return client;
     } else {
-      return newClient(uuid, client.getHost(), client.getPort());
+      return newClient(uuid, client.getHost(), client.getPort(), client.isLocal());
     }
   }
 
@@ -226,38 +233,11 @@ class ConnectionCache {
     return true;
   }
 
-  /**
-   * Gets a hostname or an IP address and returns the textual representation
-   * of the IP address.
-   * <p>
-   * <strong>This method can block</strong> as there is no API for
-   * asynchronous DNS resolution in the JDK.
-   * @param host the hostname to resolve
-   * @return the IP address associated with the given hostname,
-   * or {@code null} if the address couldn't be resolved
-   */
-  static String getIP(final String host) {
-    final long start = System.nanoTime();
-    try {
-      final String ip = InetAddress.getByName(host).getHostAddress();
-      final long latency = System.nanoTime() - start;
-      if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) {
-        LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ip, latency);
-      } else if (latency >= 3000000/*ns*/) {
-        LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ip, latency);
-      }
-      return ip;
-    } catch (UnknownHostException e) {
-      LOG.error("Failed to resolve the IP of `{}' in {}ns", host, (System.nanoTime() - start));
-      return null;
-    }
-  }
-
   private final class TabletClientPipeline extends DefaultChannelPipeline {
 
-    TabletClient init(String uuid, String host, int port) {
+    TabletClient init(String uuid, String host, int port, boolean isLocal) {
       AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
-      final TabletClient client = new TabletClient(kuduClient, uuid, host, port);
+      final TabletClient client = new TabletClient(kuduClient, uuid, host, port, isLocal);
       if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {
         super.addLast("timeout-handler",
             new ReadTimeoutHandler(kuduClient.getTimer(),

http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index f5bb44c..0b190ed 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -149,13 +149,16 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
   // differently by also clearing the caches.
   private volatile boolean gotUncaughtException = false;
 
-  public TabletClient(AsyncKuduClient client, String uuid, String host, int port) {
+  private final boolean local;
+
+  public TabletClient(AsyncKuduClient client, String uuid, String host, int port, boolean local) {
     this.kuduClient = client;
     this.uuid = uuid;
     this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs();
     this.host = host;
     this.port = port;
     this.requestTracker = client.getRequestTracker();
+    this.local = local;
   }
 
   <R> void sendRpc(KuduRpc<R> rpc) {
@@ -841,6 +844,14 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
   }
 
   /**
+   * Returns if this server is on this client's host.
+   * @return true if the server is local, else false
+   */
+  boolean isLocal() {
+    return local;
+  }
+
+  /**
    * Returns this tablet server's port.
    * @return a port number that this tablet server is bound to
    */

http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
index 589cb8f..4ca6d8c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
@@ -23,7 +23,13 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.net.HostAndPort;
 import org.apache.kudu.annotations.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
 import java.util.List;
 
 /**
@@ -32,6 +38,8 @@ import java.util.List;
 @InterfaceAudience.Private
 public class NetUtil {
 
+  private static final Logger LOG = LoggerFactory.getLogger(NetUtil.class);
+
   /**
    * Convert a list of {@link HostAndPort} objects to a comma separate string.
    * The inverse of {@link #parseStrings(String, int)}.
@@ -75,4 +83,50 @@ public class NetUtil {
     }
     return hostsAndPorts;
   }
+
+  /**
+   * Gets a hostname or an IP address and returns an InetAddress.
+   * <p>
+   * <strong>This method can block</strong> as there is no API for
+   * asynchronous DNS resolution in the JDK.
+   * @param host the hostname to resolve
+   * @return an InetAddress for the given hostname,
+   * or {@code null} if the address couldn't be resolved
+   */
+  public static InetAddress getInetAddress(final String host) {
+    final long start = System.nanoTime();
+    try {
+      InetAddress ip = InetAddress.getByName(host);
+      long latency = System.nanoTime() - start;
+      if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) {
+        LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ip, latency);
+      } else if (latency >= 3000000/*ns*/) {
+        LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ip, latency);
+      }
+      return ip;
+    } catch (UnknownHostException e) {
+      LOG.error("Failed to resolve the IP of `{}' in {}ns", host, (System.nanoTime() - start));
+      return null;
+    }
+  }
+
+  /**
+   * Given an InetAddress, checks to see if the address is a local address, by
+   * comparing the address with all the interfaces on the node.
+   * @param addr address to check if it is local node's address
+   * @return true if the address corresponds to the local node
+   */
+  public static boolean isLocalAddress(InetAddress addr) {
+    // Check if the address is any local or loopback.
+    boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+    // Check if the address is defined on any interface.
+    if (!local) {
+      try {
+        local = NetworkInterface.getByInetAddress(addr) != null;
+      } catch (SocketException e) {
+      }
+    }
+    return local;
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index 3776d50..e238069 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -39,7 +39,7 @@ public class TestConnectionCache {
       ConnectionCache cache = new ConnectionCache(client);
       int i = 0;
       for (HostAndPort hp : addresses) {
-        TabletClient conn = cache.newClient(i + "", hp.getHostText(), hp.getPort());
+        TabletClient conn = cache.newClient(i + "", hp.getHostText(), hp.getPort(), false);
         // Ping the process so we go through the whole connection process.
         pingConnection(conn);
         i++;

http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java b/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java
index cc88a3f..aa9f4b1 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java
@@ -19,11 +19,11 @@ package org.apache.kudu.util;
 import com.google.common.net.HostAndPort;
 import org.junit.Test;
 
+import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.List;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 /**
  * Test for {@link NetUtil}.
@@ -70,4 +70,12 @@ public class TestNetUtil {
     );
     assertEquals(NetUtil.hostsAndPortsToString(hostsAndPorts), "127.0.0.1:1111,1.2.3.4.5:0");
   }
+
+  @Test
+  public void testLocal() throws Exception {
+    assertTrue(NetUtil.isLocalAddress(NetUtil.getInetAddress("localhost")));
+    assertTrue(NetUtil.isLocalAddress(NetUtil.getInetAddress("127.0.0.1")));
+    assertTrue(NetUtil.isLocalAddress(InetAddress.getLocalHost()));
+    assertFalse(NetUtil.isLocalAddress(NetUtil.getInetAddress("kudu.apache.org")));
+  }
 }