You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/10/17 17:13:42 UTC
[1/2] kudu git commit: [java client] Extract ip2client out of
AsyncKuduClient
Repository: kudu
Updated Branches:
refs/heads/master 0c52f161b -> 0c44223ea
[java client] Extract ip2client out of AsyncKuduClient
As part of making the Java client more modular and easier to test, this patch is moving
almost all of the connections management into a separate class. The change was rather
painless.
Change-Id: I48b0f3f262abd5ee26869202f661b3c25158f39c
Reviewed-on: http://gerrit.cloudera.org:8080/4717
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e5b7ebf8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e5b7ebf8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e5b7ebf8
Branch: refs/heads/master
Commit: e5b7ebf8daa4b17a475814e62ed87d9dff4f6095
Parents: 0c52f16
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Thu Oct 13 12:36:23 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Fri Oct 14 21:38:26 2016 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/AsyncKuduClient.java | 373 +----------------
.../org/apache/kudu/client/ConnectionCache.java | 395 +++++++++++++++++++
.../kudu/client/TestAsyncKuduSession.java | 4 +-
3 files changed, 417 insertions(+), 355 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/e5b7ebf8/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index cbb128e..9b5ba5b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -37,9 +37,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Message;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.socket.nio.NioWorkerPool;
import org.apache.kudu.Common;
import org.apache.kudu.Schema;
import org.apache.kudu.annotations.InterfaceAudience;
@@ -51,14 +48,10 @@ import org.apache.kudu.util.AsyncUtil;
import org.apache.kudu.util.NetUtil;
import org.apache.kudu.util.Pair;
import org.apache.kudu.util.Slice;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.SocketChannelConfig;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
@@ -66,19 +59,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@@ -86,7 +73,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -148,33 +134,7 @@ public class AsyncKuduClient implements AutoCloseable {
private final ConcurrentHashMap<String, TableLocationsCache> tableLocations =
new ConcurrentHashMap<>();
- /**
- * Cache that maps a TabletServer address ("ip:port") to the clients
- * connected to it.
- * <p>
- * Access to this map must be synchronized by locking its monitor.
- * Logging the contents of this map (or calling toString) requires copying it first.
- * <p>
- * This isn't a {@link ConcurrentHashMap} because we don't use it frequently
- * (just when connecting to / disconnecting from TabletClients) and when we
- * add something to it, we want to do an atomic get-and-put, but
- * {@code putIfAbsent} isn't a good fit for us since it requires to create
- * an object that may be "wasted" in case another thread wins the insertion
- * race, and we don't want to create unnecessary connections.
- * <p>
- * Upon disconnection, clients are automatically removed from this map.
- * We don't use a {@code ChannelGroup} because a {@code ChannelGroup} does
- * the clean-up on the {@code channelClosed} event, which is actually the
- * 3rd and last event to be fired when a channel gets disconnected. The
- * first one to get fired is, {@code channelDisconnected}. This matters to
- * us because we want to purge disconnected clients from the cache as
- * quickly as possible after the disconnection, to avoid handing out clients
- * that are going to cause unnecessary errors.
- * @see TabletClientPipeline#handleDisconnect
- */
- @VisibleForTesting
- @GuardedBy("ip2client")
- final HashMap<String, TabletClient> ip2client = new HashMap<>();
+ private final ConnectionCache connectionCache;
@GuardedBy("sessions")
private final Set<AsyncKuduSession> sessions = new HashSet<>();
@@ -241,6 +201,7 @@ public class AsyncKuduClient implements AutoCloseable {
this.timer = b.timer;
String clientId = UUID.randomUUID().toString().replace("-", "");
this.requestTracker = new RequestTracker(clientId);
+ this.connectionCache = new ConnectionCache(this);
}
/**
@@ -579,6 +540,14 @@ public class AsyncKuduClient implements AutoCloseable {
return requestTracker;
}
+ HashedWheelTimer getTimer() {
+ return timer;
+ }
+
+ ClientSocketChannelFactory getChannelFactory() {
+ return channelFactory;
+ }
+
/**
* Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a particular table.
* @param table the name of the table you intend to scan.
@@ -958,13 +927,11 @@ public class AsyncKuduClient implements AutoCloseable {
* but calling certain methods on the returned TabletClients can. For example,
* it's possible to forcefully shutdown a connection to a tablet server by calling {@link
* TabletClient#shutdown()}.
- * @return Copy of the current TabletClients list
+ * @return copy of the current TabletClients list
*/
@VisibleForTesting
List<TabletClient> getTabletClients() {
- synchronized (ip2client) {
- return new ArrayList<TabletClient>(ip2client.values());
- }
+ return connectionCache.getTabletClients();
}
/**
@@ -1476,7 +1443,7 @@ public class AsyncKuduClient implements AutoCloseable {
* @return A live and initialized client for the specified master server.
*/
TabletClient newMasterClient(HostAndPort masterHostPort) {
- String ip = getIP(masterHostPort.getHostText());
+ String ip = ConnectionCache.getIP(masterHostPort.getHostText());
if (ip == null) {
return null;
}
@@ -1484,36 +1451,11 @@ public class AsyncKuduClient implements AutoCloseable {
// communicate with the masters to find out about them, and that's what we're trying to do.
// The UUID is used for logging, so instead we're passing the "master table name" followed by
// host and port which is enough to identify the node we're connecting to.
- return newClient(MASTER_TABLE_NAME_PLACEHOLDER + " - " + masterHostPort.toString(),
+ return connectionCache.newClient(
+ MASTER_TABLE_NAME_PLACEHOLDER + " - " + masterHostPort.toString(),
ip, masterHostPort.getPort());
}
- TabletClient newClient(String uuid, final String host, final int port) {
- final String hostport = host + ':' + port;
- TabletClient client;
- SocketChannel chan;
- synchronized (ip2client) {
- client = ip2client.get(hostport);
- if (client != null && client.isAlive()) {
- return client;
- }
- final TabletClientPipeline pipeline = new TabletClientPipeline();
- client = pipeline.init(uuid, host, port);
- chan = channelFactory.newChannel(pipeline);
- TabletClient oldClient = ip2client.put(hostport, client);
- assert oldClient == null;
- }
- final SocketChannelConfig config = chan.getConfig();
- config.setConnectTimeoutMillis(5000);
- config.setTcpNoDelay(true);
- // Unfortunately there is no way to override the keep-alive timeout in
- // Java since the JRE doesn't expose any way to call setsockopt() with
- // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
- config.setKeepAlive(true);
- chan.connect(new InetSocketAddress(host, port)); // Won't block.
- return client;
- }
-
/**
* Invokes {@link #shutdown()} and waits for the configured admin timeout. This method returns
* void, so consider invoking shutdown directly if there's a need to handle dangling RPCs.
@@ -1577,7 +1519,7 @@ public class AsyncKuduClient implements AutoCloseable {
final class DisconnectCB implements Callback<Deferred<ArrayList<Void>>,
ArrayList<List<OperationResponse>>> {
public Deferred<ArrayList<Void>> call(ArrayList<List<OperationResponse>> ignoredResponses) {
- return disconnectEverything().addCallback(new ReleaseResourcesCB());
+ return connectionCache.disconnectEverything().addCallback(new ReleaseResourcesCB());
}
public String toString() {
return "disconnect callback";
@@ -1614,287 +1556,12 @@ public class AsyncKuduClient implements AutoCloseable {
return Deferred.group(deferreds);
}
- /**
- * Closes every socket, which will also cancel all the RPCs in flight.
- */
- private Deferred<ArrayList<Void>> disconnectEverything() {
- ArrayList<Deferred<Void>> deferreds =
- new ArrayList<Deferred<Void>>(2);
- HashMap<String, TabletClient> ip2client_copy;
- synchronized (ip2client) {
- // Make a local copy so we can shutdown every Tablet Server clients
- // without hold the lock while we iterate over the data structure.
- ip2client_copy = new HashMap<String, TabletClient>(ip2client);
- }
-
- for (TabletClient ts : ip2client_copy.values()) {
- deferreds.add(ts.shutdown());
- }
- final int size = deferreds.size();
- return Deferred.group(deferreds).addCallback(
- new Callback<ArrayList<Void>, ArrayList<Void>>() {
- public ArrayList<Void> call(final ArrayList<Void> arg) {
- // Normally, now that we've shutdown() every client, all our caches should
- // be empty since each shutdown() generates a DISCONNECTED event, which
- // causes TabletClientPipeline to call removeClientFromIpCache().
- HashMap<String, TabletClient> logme = null;
- synchronized (ip2client) {
- if (!ip2client.isEmpty()) {
- logme = new HashMap<String, TabletClient>(ip2client);
- }
- }
- if (logme != null) {
- // Putting this logging statement inside the synchronized block
- // can lead to a deadlock, since HashMap.toString() is going to
- // call TabletClient.toString() on each entry, and this locks the
- // client briefly. Other parts of the code lock clients first and
- // the ip2client HashMap second, so this can easily deadlock.
- LOG.error("Some clients are left in the client cache and haven't"
- + " been cleaned up: " + logme);
- }
- return arg;
- }
-
- public String toString() {
- return "wait " + size + " TabletClient.shutdown()";
- }
- });
- }
-
- /**
- * Blocking call.
- * Performs a slow search of the IP used by the given client.
- * <p>
- * This is needed when we're trying to find the IP of the client before its
- * channel has successfully connected, because Netty's API offers no way of
- * retrieving the IP of the remote peer until we're connected to it.
- * @param client The client we want the IP of.
- * @return The IP of the client, or {@code null} if we couldn't find it.
- */
- private InetSocketAddress slowSearchClientIP(final TabletClient client) {
- String hostport = null;
- synchronized (ip2client) {
- for (final Map.Entry<String, TabletClient> e : ip2client.entrySet()) {
- if (e.getValue() == client) {
- hostport = e.getKey();
- break;
- }
- }
- }
-
- if (hostport == null) {
- HashMap<String, TabletClient> copy;
- synchronized (ip2client) {
- copy = new HashMap<String, TabletClient>(ip2client);
- }
- LOG.error("WTF? Should never happen! Couldn't find " + client
- + " in " + copy);
- return null;
- }
- final int colon = hostport.indexOf(':', 1);
- if (colon < 1) {
- LOG.error("WTF? Should never happen! No `:' found in " + hostport);
- return null;
- }
- final String host = getIP(hostport.substring(0, colon));
- if (host == null) {
- // getIP will print the reason why, there's nothing else we can do.
- return null;
- }
-
- int port;
- try {
- port = parsePortNumber(hostport.substring(colon + 1,
- hostport.length()));
- } catch (NumberFormatException e) {
- LOG.error("WTF? Should never happen! Bad port in " + hostport, e);
- return null;
- }
- return new InetSocketAddress(host, port);
- }
-
- /**
- * Removes the given client from the `ip2client` cache.
- * @param client The client for which we must clear the ip cache
- * @param remote The address of the remote peer, if known, or null
- */
- private void removeClientFromIpCache(final TabletClient client,
- final SocketAddress remote) {
-
- if (remote == null) {
- return; // Can't continue without knowing the remote address.
- }
-
- String hostport;
- if (remote instanceof InetSocketAddress) {
- final InetSocketAddress sock = (InetSocketAddress) remote;
- final InetAddress addr = sock.getAddress();
- if (addr == null) {
- LOG.error("WTF? Unresolved IP for " + remote
- + ". This shouldn't happen.");
- return;
- } else {
- hostport = addr.getHostAddress() + ':' + sock.getPort();
- }
- } else {
- LOG.error("WTF? Found a non-InetSocketAddress remote: " + remote
- + ". This shouldn't happen.");
- return;
- }
-
- TabletClient old;
- synchronized (ip2client) {
- old = ip2client.remove(hostport);
- }
-
- LOG.debug("Removed from IP cache: {" + hostport + "} -> {" + client + "}");
- if (old == null) {
- // Currently we're seeing this message when masters are disconnected and the hostport we got
- // above is different than the one the user passes (that we use to populate ip2client). At
- // worst this doubles the entries for masters, which has an insignificant impact.
- // TODO When fixed, make this a WARN again.
- LOG.trace("When expiring " + client + " from the client cache (host:port="
- + hostport + "), it was found that there was no entry"
- + " corresponding to " + remote + ". This shouldn't happen.");
- }
- }
-
private boolean isMasterTable(String tableId) {
// Checking that it's the same instance so there's absolutely no chance of confusing the master
// 'table' for a user one.
return MASTER_TABLE_NAME_PLACEHOLDER == tableId;
}
- private final class TabletClientPipeline extends DefaultChannelPipeline {
-
- private final Logger log = LoggerFactory.getLogger(TabletClientPipeline.class);
- /**
- * Have we already disconnected?.
- * We use this to avoid doing the cleanup work for the same client more
- * than once, even if we get multiple events indicating that the client
- * is no longer connected to the TabletServer (e.g. DISCONNECTED, CLOSED).
- * No synchronization needed as this is always accessed from only one
- * thread at a time (equivalent to a non-shared state in a Netty handler).
- */
- private boolean disconnected = false;
-
- TabletClient init(String uuid, String host, int port) {
- final TabletClient client = new TabletClient(AsyncKuduClient.this, uuid, host, port);
- if (defaultSocketReadTimeoutMs > 0) {
- super.addLast("timeout-handler",
- new ReadTimeoutHandler(timer,
- defaultSocketReadTimeoutMs,
- TimeUnit.MILLISECONDS));
- }
- super.addLast("kudu-handler", client);
-
- return client;
- }
-
- @Override
- public void sendDownstream(final ChannelEvent event) {
- if (event instanceof ChannelStateEvent) {
- handleDisconnect((ChannelStateEvent) event);
- }
- super.sendDownstream(event);
- }
-
- @Override
- public void sendUpstream(final ChannelEvent event) {
- if (event instanceof ChannelStateEvent) {
- handleDisconnect((ChannelStateEvent) event);
- }
- super.sendUpstream(event);
- }
-
- private void handleDisconnect(final ChannelStateEvent state_event) {
- if (disconnected) {
- return;
- }
- switch (state_event.getState()) {
- case OPEN:
- if (state_event.getValue() == Boolean.FALSE) {
- break; // CLOSED
- }
- return;
- case CONNECTED:
- if (state_event.getValue() == null) {
- break; // DISCONNECTED
- }
- return;
- default:
- return; // Not an event we're interested in, ignore it.
- }
-
- disconnected = true; // So we don't clean up the same client twice.
- try {
- final TabletClient client = super.get(TabletClient.class);
- SocketAddress remote = super.getChannel().getRemoteAddress();
- // At this point Netty gives us no easy way to access the
- // SocketAddress of the peer we tried to connect to. This
- // kinda sucks but I couldn't find an easier way.
- if (remote == null) {
- remote = slowSearchClientIP(client);
- }
-
- synchronized (client) {
- removeClientFromIpCache(client, remote);
- }
- } catch (Exception e) {
- log.error("Uncaught exception when handling a disconnection of " + getChannel(), e);
- }
- }
-
- }
-
- /**
- * Gets a hostname or an IP address and returns the textual representation
- * of the IP address.
- * <p>
- * <strong>This method can block</strong> as there is no API for
- * asynchronous DNS resolution in the JDK.
- * @param host The hostname to resolve.
- * @return The IP address associated with the given hostname,
- * or {@code null} if the address couldn't be resolved.
- */
- private static String getIP(final String host) {
- final long start = System.nanoTime();
- try {
- final String ip = InetAddress.getByName(host).getHostAddress();
- final long latency = System.nanoTime() - start;
- if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) {
- LOG.debug("Resolved IP of `" + host + "' to "
- + ip + " in " + latency + "ns");
- } else if (latency >= 3000000/*ns*/) {
- LOG.warn("Slow DNS lookup! Resolved IP of `" + host + "' to "
- + ip + " in " + latency + "ns");
- }
- return ip;
- } catch (UnknownHostException e) {
- LOG.error("Failed to resolve the IP of `" + host + "' in "
- + (System.nanoTime() - start) + "ns");
- return null;
- }
- }
-
- /**
- * Parses a TCP port number from a string.
- * @param portnum The string to parse.
- * @return A strictly positive, validated port number.
- * @throws NumberFormatException if the string couldn't be parsed as an
- * integer or if the value was outside of the range allowed for TCP ports.
- */
- private static int parsePortNumber(final String portnum)
- throws NumberFormatException {
- final int port = Integer.parseInt(portnum);
- if (port <= 0 || port > 65535) {
- throw new NumberFormatException(port == 0 ? "port is zero" :
- (port < 0 ? "port is negative: "
- : "port is too large: ") + port);
- }
- return port;
- }
-
void newTimeout(final TimerTask task, final long timeout_ms) {
try {
timer.newTimeout(task, timeout_ms, MILLISECONDS);
@@ -2005,11 +1672,11 @@ public class AsyncKuduClient implements AutoCloseable {
@GuardedBy("tabletServers")
private void addTabletClient(String uuid, String host, int port, boolean isLeader)
throws UnknownHostException {
- String ip = getIP(host);
+ String ip = ConnectionCache.getIP(host);
if (ip == null) {
throw new UnknownHostException("Failed to resolve the IP of `" + host + "'");
}
- TabletClient client = newClient(uuid, ip, port);
+ TabletClient client = connectionCache.newClient(uuid, ip, port);
tabletServers.add(client);
if (isLeader) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/e5b7ebf8/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
new file mode 100644
index 0000000..f421d32
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.annotations.InterfaceStability;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.SocketChannelConfig;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The ConnectionCache is responsible for managing connections to masters and tablets. There
+ * should only be one instance per Kudu client, and can <strong>not</strong> be shared between
+ * clients.
+ * <p>
+ * This class is thread-safe.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class ConnectionCache {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectionCache.class);
+
+ /**
+ * Cache that maps a TabletServer address ("ip:port") to the clients
+ * connected to it.
+ * <p>
+ * Access to this map must be synchronized by locking its monitor.
+ * Logging the contents of this map (or calling toString) requires copying it first.
+ * <p>
+ * This isn't a {@link ConcurrentHashMap} because we don't use it frequently
+ * (just when connecting to / disconnecting from TabletClients) and when we
+ * add something to it, we want to do an atomic get-and-put, but
+ * {@code putIfAbsent} isn't a good fit for us since it requires to create
+ * an object that may be "wasted" in case another thread wins the insertion
+ * race, and we don't want to create unnecessary connections.
+ * <p>
+ * Upon disconnection, clients are automatically removed from this map.
+ * We don't use a {@code ChannelGroup} because a {@code ChannelGroup} does
+ * the clean-up on the {@code channelClosed} event, which is actually the
+ * 3rd and last event to be fired when a channel gets disconnected. The
+ * first one to get fired is, {@code channelDisconnected}. This matters to
+ * us because we want to purge disconnected clients from the cache as
+ * quickly as possible after the disconnection, to avoid handing out clients
+ * that are going to cause unnecessary errors.
+ * @see TabletClientPipeline#handleDisconnect
+ */
+ @GuardedBy("ip2client")
+ private final HashMap<String, TabletClient> ip2client = new HashMap<>();
+
+ private final AsyncKuduClient kuduClient;
+
+ /**
+ * Create a new empty ConnectionCache that will used the passed client to create connections.
+ * @param client a client that contains the information we need to create connections
+ */
+ ConnectionCache(AsyncKuduClient client) {
+ this.kuduClient = client;
+ }
+
+ TabletClient newClient(String uuid, final String host, final int port) {
+ final String hostport = host + ':' + port;
+ TabletClient client;
+ SocketChannel chan;
+ synchronized (ip2client) {
+ client = ip2client.get(hostport);
+ if (client != null && client.isAlive()) {
+ return client;
+ }
+ final TabletClientPipeline pipeline = new TabletClientPipeline();
+ client = pipeline.init(uuid, host, port);
+ chan = this.kuduClient.getChannelFactory().newChannel(pipeline);
+ TabletClient oldClient = ip2client.put(hostport, client);
+ assert oldClient == null;
+ }
+ final SocketChannelConfig config = chan.getConfig();
+ config.setConnectTimeoutMillis(5000);
+ config.setTcpNoDelay(true);
+ // Unfortunately there is no way to override the keep-alive timeout in
+ // Java since the JRE doesn't expose any way to call setsockopt() with
+ // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
+ config.setKeepAlive(true);
+ chan.connect(new InetSocketAddress(host, port)); // Won't block.
+ return client;
+ }
+
+ /**
+ * Closes every socket, which will also cancel all the RPCs in flight.
+ */
+ Deferred<ArrayList<Void>> disconnectEverything() {
+ ArrayList<Deferred<Void>> deferreds = new ArrayList<>(2);
+ HashMap<String, TabletClient> ip2client_copy;
+ synchronized (ip2client) {
+ // Make a local copy so we can shutdown every Tablet Server clients
+ // without hold the lock while we iterate over the data structure.
+ ip2client_copy = new HashMap<>(ip2client);
+ }
+
+ for (TabletClient ts : ip2client_copy.values()) {
+ deferreds.add(ts.shutdown());
+ }
+ final int size = deferreds.size();
+ return Deferred.group(deferreds).addCallback(
+ new Callback<ArrayList<Void>, ArrayList<Void>>() {
+ public ArrayList<Void> call(final ArrayList<Void> arg) {
+ // Normally, now that we've shutdown() every client, all our caches should
+ // be empty since each shutdown() generates a DISCONNECTED event, which
+ // causes TabletClientPipeline to call removeClientFromIpCache().
+ HashMap<String, TabletClient> logme = null;
+ synchronized (ip2client) {
+ if (!ip2client.isEmpty()) {
+ logme = new HashMap<>(ip2client);
+ }
+ }
+ if (logme != null) {
+ // Putting this logging statement inside the synchronized block
+ // can lead to a deadlock, since HashMap.toString() is going to
+ // call TabletClient.toString() on each entry, and this locks the
+ // client briefly. Other parts of the code lock clients first and
+ // the ip2client HashMap second, so this can easily deadlock.
+ LOG.error("Some clients are left in the client cache and haven't"
+ + " been cleaned up: " + logme);
+ }
+ return arg;
+ }
+
+ public String toString() {
+ return "wait " + size + " TabletClient.shutdown()";
+ }
+ });
+ }
+
+ /**
+ * Modifying the list returned by this method won't affect this cache,
+ * but calling certain methods on the returned TabletClients can. For example,
+ * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
+ * TabletClient#shutdown()}.
+ * @return copy of the current TabletClients list
+ */
+ List<TabletClient> getTabletClients() {
+ synchronized (ip2client) {
+ return new ArrayList<>(ip2client.values());
+ }
+ }
+
+ /**
+ * Blocking call. Performs a slow search of the IP used by the given client.
+ * <p>
+ * This is needed when we're trying to find the IP of the client before its
+ * channel has successfully connected, because Netty's API offers no way of
+ * retrieving the IP of the remote peer until we're connected to it.
+ * @param client the client we want the IP of
+ * @return the IP of the client, or {@code null} if we couldn't find it
+ */
+ private InetSocketAddress slowSearchClientIP(final TabletClient client) {
+ String hostport = null;
+ synchronized (ip2client) {
+ for (final Map.Entry<String, TabletClient> e : ip2client.entrySet()) {
+ if (e.getValue() == client) {
+ hostport = e.getKey();
+ break;
+ }
+ }
+ }
+
+ if (hostport == null) {
+ HashMap<String, TabletClient> copy;
+ synchronized (ip2client) {
+ copy = new HashMap<>(ip2client);
+ }
+ LOG.error("Couldn't find {} in {}", client, copy);
+ return null;
+ }
+ final int colon = hostport.indexOf(':', 1);
+ if (colon < 1) {
+ LOG.error("No `:' found in {}", hostport);
+ return null;
+ }
+ final String host = getIP(hostport.substring(0, colon));
+ if (host == null) {
+ // getIP will print the reason why, there's nothing else we can do.
+ return null;
+ }
+
+ int port;
+ try {
+ port = parsePortNumber(hostport.substring(colon + 1,
+ hostport.length()));
+ } catch (NumberFormatException e) {
+ LOG.error("Bad port in {}", hostport, e);
+ return null;
+ }
+ return new InetSocketAddress(host, port);
+ }
+
+ /**
+ * Removes the given client from the `ip2client` cache.
+ * @param client the client for which we must clear the ip cache
+ * @param remote the address of the remote peer, if known, or null
+ */
+ private void removeClientFromIpCache(final TabletClient client,
+ final SocketAddress remote) {
+
+ if (remote == null) {
+ return; // Can't continue without knowing the remote address.
+ }
+
+ String hostport;
+ if (remote instanceof InetSocketAddress) {
+ final InetSocketAddress sock = (InetSocketAddress) remote;
+ final InetAddress addr = sock.getAddress();
+ if (addr == null) {
+ LOG.error("Unresolved IP for {}. This shouldn't happen.", remote);
+ return;
+ } else {
+ hostport = addr.getHostAddress() + ':' + sock.getPort();
+ }
+ } else {
+ LOG.error("Found a non-InetSocketAddress remote: {}. This shouldn't happen.", remote);
+ return;
+ }
+
+ TabletClient old;
+ synchronized (ip2client) {
+ old = ip2client.remove(hostport);
+ }
+
+ LOG.debug("Removed from IP cache: {" + hostport + "} -> {" + client + "}");
+ if (old == null) {
+ LOG.trace("When expiring {} from the client cache (host:port={}"
+ + "), it was found that there was no entry"
+ + " corresponding to {}. This shouldn't happen.", client, hostport, remote);
+ }
+ }
+
+ /**
+ * Gets a hostname or an IP address and returns the textual representation
+ * of the IP address.
+ * <p>
+ * <strong>This method can block</strong> as there is no API for
+ * asynchronous DNS resolution in the JDK.
+ * @param host the hostname to resolve
+ * @return the IP address associated with the given hostname,
+ * or {@code null} if the address couldn't be resolved
+ */
+ static String getIP(final String host) {
+ final long start = System.nanoTime();
+ try {
+ final String ip = InetAddress.getByName(host).getHostAddress();
+ final long latency = System.nanoTime() - start;
+ if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) {
+ LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ip, latency);
+ } else if (latency >= 3000000/*ns*/) {
+ LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ip, latency);
+ }
+ return ip;
+ } catch (UnknownHostException e) {
+ LOG.error("Failed to resolve the IP of `{}' in {}ns", host, (System.nanoTime() - start));
+ return null;
+ }
+ }
+
+ /**
+ * Parses a TCP port number from a string.
+ * @param portnum the string to parse
+ * @return a strictly positive, validated port number
+ * @throws NumberFormatException if the string couldn't be parsed as an
+ * integer or if the value was outside of the range allowed for TCP ports
+ */
+ private static int parsePortNumber(final String portnum)
+ throws NumberFormatException {
+ final int port = Integer.parseInt(portnum);
+ if (port <= 0 || port > 65535) {
+ throw new NumberFormatException(port == 0 ? "port is zero" :
+ (port < 0 ? "port is negative: "
+ : "port is too large: ") + port);
+ }
+ return port;
+ }
+
+ private final class TabletClientPipeline extends DefaultChannelPipeline {
+
+ private final Logger log = LoggerFactory.getLogger(TabletClientPipeline.class);
+ /**
+ * Have we already disconnected?.
+ * We use this to avoid doing the cleanup work for the same client more
+ * than once, even if we get multiple events indicating that the client
+ * is no longer connected to the TabletServer (e.g. DISCONNECTED, CLOSED).
+ * No synchronization needed as this is always accessed from only one
+ * thread at a time (equivalent to a non-shared state in a Netty handler).
+ */
+ private boolean disconnected = false;
+
+ TabletClient init(String uuid, String host, int port) {
+ AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
+ final TabletClient client = new TabletClient(kuduClient, uuid, host, port);
+ if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {
+ super.addLast("timeout-handler",
+ new ReadTimeoutHandler(kuduClient.getTimer(),
+ kuduClient.getDefaultSocketReadTimeoutMs(),
+ TimeUnit.MILLISECONDS));
+ }
+ super.addLast("kudu-handler", client);
+
+ return client;
+ }
+
+ @Override
+ public void sendDownstream(final ChannelEvent event) {
+ if (event instanceof ChannelStateEvent) {
+ handleDisconnect((ChannelStateEvent) event);
+ }
+ super.sendDownstream(event);
+ }
+
+ @Override
+ public void sendUpstream(final ChannelEvent event) {
+ if (event instanceof ChannelStateEvent) {
+ handleDisconnect((ChannelStateEvent) event);
+ }
+ super.sendUpstream(event);
+ }
+
+ private void handleDisconnect(final ChannelStateEvent state_event) {
+ if (disconnected) {
+ return;
+ }
+ switch (state_event.getState()) {
+ case OPEN:
+ if (state_event.getValue() == Boolean.FALSE) {
+ break; // CLOSED
+ }
+ return;
+ case CONNECTED:
+ if (state_event.getValue() == null) {
+ break; // DISCONNECTED
+ }
+ return;
+ default:
+ return; // Not an event we're interested in, ignore it.
+ }
+
+ disconnected = true; // So we don't clean up the same client twice.
+ try {
+ final TabletClient client = super.get(TabletClient.class);
+ SocketAddress remote = super.getChannel().getRemoteAddress();
+ // At this point Netty gives us no easy way to access the
+ // SocketAddress of the peer we tried to connect to. This
+ // kinda sucks but I couldn't find an easier way.
+ if (remote == null) {
+ remote = slowSearchClientIP(client);
+ }
+
+ synchronized (client) {
+ removeClientFromIpCache(client, remote);
+ }
+ } catch (Exception e) {
+ log.error("Uncaught exception when handling a disconnection of " + getChannel(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/e5b7ebf8/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index 2de16fd..2c191ff 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -195,7 +195,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join();
- int numClientsBefore = client.ip2client.size();
+ int numClientsBefore = client.getTabletClients().size();
// Restart all the tablet servers.
killTabletServers();
@@ -206,7 +206,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
session.apply(createBasicSchemaInsert(nonReplicatedTable, 2)).join();
// We should not have leaked an entry in the client2tablets map.
- int numClientsAfter = client.ip2client.size();
+ int numClientsAfter = client.getTabletClients().size();
assertEquals(numClientsBefore, numClientsAfter);
} finally {
restartTabletServers();
[2/2] kudu git commit: ITBLL: use a faster PRNG
Posted by to...@apache.org.
ITBLL: use a faster PRNG
The SecureRandom PRNG is very very slow. Since we don't need
cryptographic random numbers, we can use a simpler random. This switches
to Xoroshiro128+, which is much faster but still has a long period
(2^128) necessary to avoid collisions.
The implementation is from Squidlib[1] which is Apache licensed. The
file itself has a license header indicating it is public domain. The
original C implementation is also in the public domain[2]. Given that, I
didn't reference this in LICENSE.txt or NOTICE.
It's copy-pasted rather than introduced as a dependency because SquidLib
is a library for writing turn-based games in Swing -- it just happens to
have a good RNG in it.
[1] https://github.com/SquidPony/SquidLib/blob/master/squidlib-util/src/main/java/squidpony/squidmath/XoRoRNG.java
at revision b4fb31efe527d3298b8f37f3cc72e957579ad6e3
[2] http://xoroshiro.di.unimi.it/xoroshiro128plus.c
Change-Id: I2f51664af25b9fb4309dd78556e954bf483d22c0
Reviewed-on: http://gerrit.cloudera.org:8080/4731
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0c44223e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0c44223e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0c44223e
Branch: refs/heads/master
Commit: 0c44223ea9e8561d9c48299007094f6f0d3f495f
Parents: e5b7ebf
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Oct 13 22:24:51 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Oct 17 17:13:25 2016 +0000
----------------------------------------------------------------------
.../tools/IntegrationTestBigLinkedList.java | 47 +++++++++++++++++++-
1 file changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/0c44223e/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
index 7a2d6ac..5a0c0ea 100644
--- a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
+++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java
@@ -304,6 +304,49 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
}
/**
+ * Implementation of the Xoroshiro128+ PRNG.
+ * Copied under the public domain from SquidLib.
+ */
+ private static class Xoroshiro128PlusRandom {
+ private long state0, state1;
+ public Xoroshiro128PlusRandom() {
+ this((long) (Math.random() * Long.MAX_VALUE));
+ }
+ public Xoroshiro128PlusRandom(long seed) {
+ long state = seed + 0x9E3779B97F4A7C15L,
+ z = state;
+ z = (z ^ (z >>> 30)) * 0xBF58476D1CE4E5B9L;
+ z = (z ^ (z >>> 27)) * 0x94D049BB133111EBL;
+ state0 = z ^ (z >>> 31);
+ state += state0 + 0x9E3779B97F4A7C15L;
+ z = state;
+ z = (z ^ (z >>> 30)) * 0xBF58476D1CE4E5B9L;
+ z = (z ^ (z >>> 27)) * 0x94D049BB133111EBL;
+ state1 = z ^ (z >>> 31);
+ }
+ public long nextLong() {
+ final long s0 = state0;
+ long s1 = state1;
+ final long result = s0 + s1;
+
+ s1 ^= s0;
+ state0 = Long.rotateLeft(s0, 55) ^ s1 ^ (s1 << 14); // a, b
+ state1 = Long.rotateLeft(s1, 36); // c
+
+ return result;
+ }
+ public void nextBytes(final byte[] bytes) {
+ int i = bytes.length, n = 0;
+ while (i != 0) {
+ n = Math.min(i, 8);
+ for (long bits = nextLong(); n-- != 0; bits >>>= 8) {
+ bytes[--i] = (byte) bits;
+ }
+ }
+ }
+ }
+
+ /**
* A Map only job that generates random linked list and stores them.
*/
static class Generator extends Configured implements Tool {
@@ -331,7 +374,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
private long count;
private long numNodes;
- private Random rand;
+ private Xoroshiro128PlusRandom rand;
@Override
public void close() throws IOException {
@@ -359,7 +402,7 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool {
throws IOException, InterruptedException {
numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
// Use SecureRandom to avoid issue described in HBASE-13382.
- rand = new SecureRandom();
+ rand = new Xoroshiro128PlusRandom();
}
@Override