You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/10/24 23:18:17 UTC
kudu git commit: [java client] Identify client-local TabletClients
Repository: kudu
Updated Branches:
refs/heads/master 1a8ce4269 -> ff24651cb
[java client] Identify client-local TabletClients
This patch adds building blocks to enable selecting the closest replica. Another
patch will be required to finish the plumbing job.
Inspired by Hadoop: https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java#L698
Change-Id: Ia1bfcbc6b6aea7cb9610e8ae934bf08ab0774ee3
Reviewed-on: http://gerrit.cloudera.org:8080/4786
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ff24651c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ff24651c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ff24651c
Branch: refs/heads/master
Commit: ff24651cb9cb7f78ee2806b8624e61fed132b516
Parents: 1a8ce42
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Fri Oct 21 15:30:39 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Mon Oct 24 23:16:54 2016 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/AsyncKuduClient.java | 7 +--
.../org/apache/kudu/client/ConnectionCache.java | 52 ++++++-------------
.../org/apache/kudu/client/TabletClient.java | 13 ++++-
.../main/java/org/apache/kudu/util/NetUtil.java | 54 ++++++++++++++++++++
.../apache/kudu/client/TestConnectionCache.java | 2 +-
.../java/org/apache/kudu/util/TestNetUtil.java | 12 ++++-
6 files changed, 97 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index f455c0e..a781f9f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -53,6 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -1405,8 +1406,8 @@ public class AsyncKuduClient implements AutoCloseable {
* @return A live and initialized client for the specified master server.
*/
TabletClient newMasterClient(HostAndPort masterHostPort) {
- String ip = ConnectionCache.getIP(masterHostPort.getHostText());
- if (ip == null) {
+ InetAddress inetAddress = NetUtil.getInetAddress((masterHostPort.getHostText()));
+ if (inetAddress == null) {
return null;
}
// We should pass a UUID here but we have a chicken and egg problem, we first need to
@@ -1415,7 +1416,7 @@ public class AsyncKuduClient implements AutoCloseable {
// host and port which is enough to identify the node we're connecting to.
return connectionCache.newClient(
MASTER_TABLE_NAME_PLACEHOLDER + " - " + masterHostPort.toString(),
- ip, masterHostPort.getPort());
+ inetAddress, masterHostPort.getPort());
}
/**
http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 9a885fb..ed6100e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -23,6 +23,7 @@ import org.apache.kudu.Common;
import org.apache.kudu.annotations.InterfaceAudience;
import org.apache.kudu.annotations.InterfaceStability;
import org.apache.kudu.master.Master;
+import org.apache.kudu.util.NetUtil;
import org.jboss.netty.channel.DefaultChannelPipeline;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.channel.socket.SocketChannelConfig;
@@ -104,15 +105,21 @@ class ConnectionCache {
// from meta_cache.cc
// TODO: if the TS advertises multiple host/ports, pick the right one
// based on some kind of policy. For now just use the first always.
- String ip = getIP(addresses.get(0).getHost());
- if (ip == null) {
+ InetAddress inetAddress = NetUtil.getInetAddress(addresses.get(0).getHost());
+ if (inetAddress == null) {
throw new UnknownHostException(
"Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
}
- newClient(uuid, ip, addresses.get(0).getPort());
+ newClient(uuid, inetAddress, addresses.get(0).getPort());
}
- TabletClient newClient(String uuid, final String host, final int port) {
+ TabletClient newClient(String uuid, InetAddress inetAddress, int port) {
+ String host = inetAddress.getHostAddress();
+ boolean isLocal = NetUtil.isLocalAddress(inetAddress);
+ return newClient(uuid, host, port, isLocal);
+ }
+
+ TabletClient newClient(String uuid, String host, int port, boolean isLocal) {
TabletClient client;
SocketChannel chan;
@@ -123,7 +130,7 @@ class ConnectionCache {
return client;
}
final TabletClientPipeline pipeline = new TabletClientPipeline();
- client = pipeline.init(uuid, host, port);
+ client = pipeline.init(uuid, host, port, isLocal);
chan = this.kuduClient.getChannelFactory().newChannel(pipeline);
uuid2client.put(uuid, client);
} finally {
@@ -159,7 +166,7 @@ class ConnectionCache {
/**
* Get a connection to a server for the given UUID. This method will automatically call
- * {@link #newClient(String, String, int)} if the cached connection is down.
+ * {@link #newClient(String, InetAddress, int)} if the cached connection is down.
* @param uuid server's identifier
* @return a connection to a server, or null if the passed UUID isn't known
*/
@@ -171,7 +178,7 @@ class ConnectionCache {
} else if (client.isAlive()) {
return client;
} else {
- return newClient(uuid, client.getHost(), client.getPort());
+ return newClient(uuid, client.getHost(), client.getPort(), client.isLocal());
}
}
@@ -226,38 +233,11 @@ class ConnectionCache {
return true;
}
- /**
- * Gets a hostname or an IP address and returns the textual representation
- * of the IP address.
- * <p>
- * <strong>This method can block</strong> as there is no API for
- * asynchronous DNS resolution in the JDK.
- * @param host the hostname to resolve
- * @return the IP address associated with the given hostname,
- * or {@code null} if the address couldn't be resolved
- */
- static String getIP(final String host) {
- final long start = System.nanoTime();
- try {
- final String ip = InetAddress.getByName(host).getHostAddress();
- final long latency = System.nanoTime() - start;
- if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) {
- LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ip, latency);
- } else if (latency >= 3000000/*ns*/) {
- LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ip, latency);
- }
- return ip;
- } catch (UnknownHostException e) {
- LOG.error("Failed to resolve the IP of `{}' in {}ns", host, (System.nanoTime() - start));
- return null;
- }
- }
-
private final class TabletClientPipeline extends DefaultChannelPipeline {
- TabletClient init(String uuid, String host, int port) {
+ TabletClient init(String uuid, String host, int port, boolean isLocal) {
AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
- final TabletClient client = new TabletClient(kuduClient, uuid, host, port);
+ final TabletClient client = new TabletClient(kuduClient, uuid, host, port, isLocal);
if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {
super.addLast("timeout-handler",
new ReadTimeoutHandler(kuduClient.getTimer(),
http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index f5bb44c..0b190ed 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -149,13 +149,16 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
// differently by also clearing the caches.
private volatile boolean gotUncaughtException = false;
- public TabletClient(AsyncKuduClient client, String uuid, String host, int port) {
+ private final boolean local;
+
+ public TabletClient(AsyncKuduClient client, String uuid, String host, int port, boolean local) {
this.kuduClient = client;
this.uuid = uuid;
this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs();
this.host = host;
this.port = port;
this.requestTracker = client.getRequestTracker();
+ this.local = local;
}
<R> void sendRpc(KuduRpc<R> rpc) {
@@ -841,6 +844,14 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
}
/**
+ * Returns if this server is on this client's host.
+ * @return true if the server is local, else false
+ */
+ boolean isLocal() {
+ return local;
+ }
+
+ /**
* Returns this tablet server's port.
* @return a port number that this tablet server is bound to
*/
http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
index 589cb8f..4ca6d8c 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
@@ -23,7 +23,13 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import org.apache.kudu.annotations.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
import java.util.List;
/**
@@ -32,6 +38,8 @@ import java.util.List;
@InterfaceAudience.Private
public class NetUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(NetUtil.class);
+
/**
* Convert a list of {@link HostAndPort} objects to a comma separate string.
* The inverse of {@link #parseStrings(String, int)}.
@@ -75,4 +83,50 @@ public class NetUtil {
}
return hostsAndPorts;
}
+
+ /**
+ * Gets a hostname or an IP address and returns an InetAddress.
+ * <p>
+ * <strong>This method can block</strong> as there is no API for
+ * asynchronous DNS resolution in the JDK.
+ * @param host the hostname to resolve
+ * @return an InetAddress for the given hostname,
+ * or {@code null} if the address couldn't be resolved
+ */
+ public static InetAddress getInetAddress(final String host) {
+ final long start = System.nanoTime();
+ try {
+ InetAddress ip = InetAddress.getByName(host);
+ long latency = System.nanoTime() - start;
+ if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) {
+ LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ip, latency);
+ } else if (latency >= 3000000/*ns*/) {
+ LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ip, latency);
+ }
+ return ip;
+ } catch (UnknownHostException e) {
+ LOG.error("Failed to resolve the IP of `{}' in {}ns", host, (System.nanoTime() - start));
+ return null;
+ }
+ }
+
+ /**
+ * Given an InetAddress, checks to see if the address is a local address, by
+ * comparing the address with all the interfaces on the node.
+ * @param addr address to check if it is local node's address
+ * @return true if the address corresponds to the local node
+ */
+ public static boolean isLocalAddress(InetAddress addr) {
+ // Check if the address is any local or loopback.
+ boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+ // Check if the address is defined on any interface.
+ if (!local) {
+ try {
+ local = NetworkInterface.getByInetAddress(addr) != null;
+ } catch (SocketException e) {
+ }
+ }
+ return local;
+ }
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index 3776d50..e238069 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -39,7 +39,7 @@ public class TestConnectionCache {
ConnectionCache cache = new ConnectionCache(client);
int i = 0;
for (HostAndPort hp : addresses) {
- TabletClient conn = cache.newClient(i + "", hp.getHostText(), hp.getPort());
+ TabletClient conn = cache.newClient(i + "", hp.getHostText(), hp.getPort(), false);
// Ping the process so we go through the whole connection process.
pingConnection(conn);
i++;
http://git-wip-us.apache.org/repos/asf/kudu/blob/ff24651c/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java b/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java
index cc88a3f..aa9f4b1 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/util/TestNetUtil.java
@@ -19,11 +19,11 @@ package org.apache.kudu.util;
import com.google.common.net.HostAndPort;
import org.junit.Test;
+import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
/**
* Test for {@link NetUtil}.
@@ -70,4 +70,12 @@ public class TestNetUtil {
);
assertEquals(NetUtil.hostsAndPortsToString(hostsAndPorts), "127.0.0.1:1111,1.2.3.4.5:0");
}
+
+ @Test
+ public void testLocal() throws Exception {
+ assertTrue(NetUtil.isLocalAddress(NetUtil.getInetAddress("localhost")));
+ assertTrue(NetUtil.isLocalAddress(NetUtil.getInetAddress("127.0.0.1")));
+ assertTrue(NetUtil.isLocalAddress(InetAddress.getLocalHost()));
+ assertFalse(NetUtil.isLocalAddress(NetUtil.getInetAddress("kudu.apache.org")));
+ }
}