You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/07/05 20:59:33 UTC
[1/2] kudu git commit: [java] separating Connection
Repository: kudu
Updated Branches:
refs/heads/master 7e7452704 -> 58248841f
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
new file mode 100644
index 0000000..3f8de9b
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -0,0 +1,406 @@
+/*
+ * Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * - Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * - Neither the name of the StumbleUpon nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.apache.kudu.client;
+
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import com.stumbleupon.async.Callback;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.WireProtocol;
+import org.apache.kudu.master.Master;
+import org.apache.kudu.rpc.RpcHeader;
+import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+import org.apache.kudu.tserver.Tserver;
+import org.apache.kudu.util.Pair;
+
+
+/**
+ * This is a 'stateless' helper to send RPCs to a Kudu server ('stateless' in the sense that it
+ * does not keep any state itself besides the references to the {@link AsyncKuduClient} and
+ * {@link Connection} objects.
+ * <p>
+ * This helper serializes and de-serializes RPC requests and responses and provides handy
+ * methods to send the serialized RPC to the underlying {@link Connection} and to handle the
+ * response from it.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RpcProxy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RpcProxy.class);
+
+ /** The reference to the top-level Kudu client object. */
+ private final AsyncKuduClient client;
+
+ /** The reference to the object representing connection to the target server. */
+ private final Connection connection;
+
+ /**
+ * Construct RpcProxy object.
+ *
+ * @param client top-level Kudu client object
+ * @param connection the connection associated with the target Kudu server
+ */
+ RpcProxy(AsyncKuduClient client, Connection connection) {
+ Preconditions.checkNotNull(client);
+ Preconditions.checkNotNull(connection);
+ this.client = client;
+ this.connection = connection;
+ }
+
+ /**
+ * Send the specified RPC using the connection to the Kudu server.
+ *
+ * @param <R> type of the RPC
+ * @param rpc the RPC to send over the connection
+ */
+ <R> void sendRpc(final KuduRpc<R> rpc) {
+ sendRpc(client, connection, rpc);
+ }
+
+ /**
+ * Send the specified RPC using the connection to the Kudu server.
+ *
+ * @param <R> type of the RPC
+ * @param client client object to handle response and sending retries, if needed
+ * @param connection connection to send the request over
+ * @param rpc the RPC to send over the connection
+ */
+ static <R> void sendRpc(final AsyncKuduClient client,
+ final Connection connection,
+ final KuduRpc<R> rpc) {
+ try {
+ if (!rpc.getRequiredFeatures().isEmpty()) {
+ // An extra optimization: when the peer's features are already known, check that the server
+ // supports feature flags, if those are required.
+ Set<RpcFeatureFlag> features = connection.getPeerFeatures();
+ if (features != null &&
+ !features.contains(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS)) {
+ throw new NonRecoverableException(Status.NotSupported(
+ "the server does not support the APPLICATION_FEATURE_FLAGS RPC feature"));
+ }
+ }
+
+ Preconditions.checkArgument(rpc.hasDeferred());
+ rpc.addTrace(
+ new RpcTraceFrame.RpcTraceFrameBuilder(
+ rpc.method(),
+ RpcTraceFrame.Action.SEND_TO_SERVER)
+ .serverInfo(connection.getServerInfo())
+ .build());
+
+ if (!rpc.deadlineTracker.hasDeadline()) {
+ LOG.warn("{} sending RPC with no timeout {}", connection.getLogPrefix(), rpc);
+ }
+ connection.enqueueMessage(rpcToMessage(client, rpc),
+ new Callback<Void, Connection.CallResponseInfo>() {
+ @Override
+ public Void call(Connection.CallResponseInfo callResponseInfo) throws Exception {
+ try {
+ responseReceived(client, connection, rpc,
+ callResponseInfo.response, callResponseInfo.exception);
+ } catch (Exception e) {
+ rpc.errback(e);
+ }
+ return null;
+ }
+ });
+ } catch (RecoverableException e) {
+ // This is to handle RecoverableException(Status.IllegalState()) from
+ // Connection.enqueueMessage() if the connection turned into the DISCONNECTED state.
+ client.handleRetryableError(rpc, e);
+ } catch (Exception e) {
+ rpc.errback(e);
+ }
+ }
+
+ /**
+ * Build {@link RpcOutboundMessage} out from {@link KuduRpc}.
+ *
+ * @param <R> type of the RPC
+ * @param client client object to handle response and sending retries, if needed
+ * @param rpc the RPC to convert into outbound message
+ * @return the result {@link RpcOutboundMessage}
+ */
+ private static <R> RpcOutboundMessage rpcToMessage(
+ final AsyncKuduClient client,
+ final KuduRpc<R> rpc) {
+ // The callId is set by Connection.enqueueMessage().
+ final RpcHeader.RequestHeader.Builder headerBuilder = RpcHeader.RequestHeader.newBuilder()
+ .addAllRequiredFeatureFlags(rpc.getRequiredFeatures())
+ .setRemoteMethod(
+ RpcHeader.RemoteMethodPB.newBuilder()
+ .setServiceName(rpc.serviceName())
+ .setMethodName(rpc.method()));
+ final Message reqPB = rpc.createRequestPB();
+
+ if (rpc.deadlineTracker.hasDeadline()) {
+ headerBuilder.setTimeoutMillis((int) rpc.deadlineTracker.getMillisBeforeDeadline());
+ }
+
+ if (rpc.isRequestTracked()) {
+ RpcHeader.RequestIdPB.Builder requestIdBuilder = RpcHeader.RequestIdPB.newBuilder();
+ final RequestTracker requestTracker = client.getRequestTracker();
+ if (rpc.getSequenceId() == RequestTracker.NO_SEQ_NO) {
+ rpc.setSequenceId(requestTracker.newSeqNo());
+ }
+ requestIdBuilder.setClientId(requestTracker.getClientId());
+ requestIdBuilder.setSeqNo(rpc.getSequenceId());
+ requestIdBuilder.setAttemptNo(rpc.attempt);
+ requestIdBuilder.setFirstIncompleteSeqNo(requestTracker.firstIncomplete());
+ headerBuilder.setRequestId(requestIdBuilder);
+ }
+
+ return new RpcOutboundMessage(headerBuilder, reqPB);
+ }
+
+ private static <R> void responseReceived(AsyncKuduClient client,
+ Connection connection,
+ final KuduRpc<R> rpc,
+ CallResponse response,
+ KuduException ex) {
+ Preconditions.checkNotNull(rpc);
+
+ final long start = System.nanoTime();
+ if (LOG.isTraceEnabled()) {
+ if (response == null) {
+ LOG.trace("{} received null response for RPC {}",
+ connection.getLogPrefix(), rpc);
+ } else {
+ RpcHeader.ResponseHeader header = response.getHeader();
+ Preconditions.checkNotNull(header);
+ LOG.trace("{} received response with rpcId {}, size {} for RPC {}",
+ connection.getLogPrefix(), header.getCallId(),
+ response.getTotalResponseSize(), rpc);
+ }
+ }
+
+ RpcTraceFrame.RpcTraceFrameBuilder traceBuilder = new RpcTraceFrame.RpcTraceFrameBuilder(
+ rpc.method(), RpcTraceFrame.Action.RECEIVE_FROM_SERVER).serverInfo(
+ connection.getServerInfo());
+ if (ex != null) {
+ if (ex instanceof RecoverableException) {
+ // This check is specifically for the ERROR_SERVER_TOO_BUSY, ERROR_UNAVAILABLE and alike.
+ failOrRetryRpc(client, connection, rpc, (RecoverableException) ex);
+ return;
+ }
+ rpc.addTrace(traceBuilder.callStatus(ex.getStatus()).build());
+ rpc.errback(ex);
+ return;
+ }
+
+ Pair<R, Object> decoded = null;
+ KuduException exception = null;
+ try {
+ decoded = rpc.deserialize(response, connection.getServerInfo().getUuid());
+ } catch (KuduException e) {
+ exception = e;
+ } catch (Exception e) {
+ rpc.addTrace(traceBuilder.build());
+ rpc.errback(e);
+ return;
+ }
+
+ // We can get this Message from within the RPC's expected type,
+ // so convert it into an exception and nullify decoded so that we use the errback route.
+ // Have to do it for both TS and Master errors.
+ if (decoded != null) {
+ if (decoded.getSecond() instanceof Tserver.TabletServerErrorPB) {
+ Tserver.TabletServerErrorPB error = (Tserver.TabletServerErrorPB) decoded.getSecond();
+ exception = dispatchTSError(client, connection, rpc, error, traceBuilder);
+ if (exception == null) {
+ // It was taken care of.
+ return;
+ } else {
+ // We're going to errback.
+ decoded = null;
+ }
+ } else if (decoded.getSecond() instanceof Master.MasterErrorPB) {
+ Master.MasterErrorPB error = (Master.MasterErrorPB) decoded.getSecond();
+ exception = dispatchMasterError(client, connection, rpc, error, traceBuilder);
+ if (exception == null) {
+ // Exception was taken care of.
+ return;
+ } else {
+ decoded = null;
+ }
+ }
+ }
+
+ try {
+ if (decoded != null) {
+ Preconditions.checkState(!(decoded.getFirst() instanceof Exception));
+ if (client.isStatisticsEnabled()) {
+ rpc.updateStatistics(client.getStatistics(), decoded.getFirst());
+ }
+ rpc.addTrace(traceBuilder.callStatus(Status.OK()).build());
+ rpc.callback(decoded.getFirst());
+ } else {
+ if (client.isStatisticsEnabled()) {
+ rpc.updateStatistics(client.getStatistics(), null);
+ }
+ rpc.addTrace(traceBuilder.callStatus(exception.getStatus()).build());
+ rpc.errback(exception);
+ }
+ } catch (Exception e) {
+ RpcHeader.ResponseHeader header = response.getHeader();
+ Preconditions.checkNotNull(header);
+ LOG.debug("{} unexpected exception {} while handling call: callId {}, RPC {}",
+ connection.getLogPrefix(), e, header.getCallId(), rpc);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("------------------<< LEAVING DECODE <<------------------" +
+ " time elapsed: " + ((System.nanoTime() - start) / 1000) + "us");
+ }
+ }
+
+ /**
+ * Takes care of a few kinds of TS errors that we handle differently, like tablets or leaders
+ * moving. Builds and returns an exception if we don't know what to do with it.
+ *
+ * @param client client object to handle response and sending retries, if needed
+ * @param connection connection to send the request over
+ * @param rpc the original RPC call that triggered the error
+ * @param error the error the TS sent
+ * @param tracer RPC trace builder to add a record on the error into the call history
+ * @return an exception if we couldn't dispatch the error, or null
+ */
+ private static KuduException dispatchTSError(AsyncKuduClient client,
+ Connection connection,
+ KuduRpc<?> rpc,
+ Tserver.TabletServerErrorPB error,
+ RpcTraceFrame.RpcTraceFrameBuilder tracer) {
+ Tserver.TabletServerErrorPB.Code errCode = error.getCode();
+ WireProtocol.AppStatusPB.ErrorCode errStatusCode = error.getStatus().getCode();
+ Status status = Status.fromTabletServerErrorPB(error);
+ if (errCode == Tserver.TabletServerErrorPB.Code.TABLET_NOT_FOUND) {
+ client.handleTabletNotFound(
+ rpc, new RecoverableException(status), connection.getServerInfo());
+ // we're not calling rpc.callback() so we rely on the client to retry that RPC
+ } else if (errCode == Tserver.TabletServerErrorPB.Code.TABLET_NOT_RUNNING ||
+ errStatusCode == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
+ client.handleRetryableError(rpc, new RecoverableException(status));
+ // The following two error codes are an indication that the tablet isn't a leader.
+ } else if (errStatusCode == WireProtocol.AppStatusPB.ErrorCode.ILLEGAL_STATE ||
+ errStatusCode == WireProtocol.AppStatusPB.ErrorCode.ABORTED) {
+ client.handleNotLeader(rpc, new RecoverableException(status), connection.getServerInfo());
+ } else {
+ return new NonRecoverableException(status);
+ }
+ rpc.addTrace(tracer.callStatus(status).build());
+ return null;
+ }
+
+ /**
+ * Provides different handling for various kinds of master errors: re-uses the
+ * mechanisms already in place for handling tablet server errors as much as possible.
+ *
+ * @param client client object to handle response and sending retries, if needed
+ * @param connection connection to send the request over
+ * @param rpc the original RPC call that triggered the error
+ * @param error the error the master sent
+ * @param tracer RPC trace builder to add a record on the error into the call history
+ * @return an exception if we couldn't dispatch the error, or null
+ */
+ private static KuduException dispatchMasterError(AsyncKuduClient client,
+ Connection connection,
+ KuduRpc<?> rpc,
+ Master.MasterErrorPB error,
+ RpcTraceFrame.RpcTraceFrameBuilder tracer) {
+
+ WireProtocol.AppStatusPB.ErrorCode code = error.getStatus().getCode();
+ Status status = Status.fromMasterErrorPB(error);
+ if (error.getCode() == Master.MasterErrorPB.Code.NOT_THE_LEADER) {
+ client.handleNotLeader(rpc, new RecoverableException(status), connection.getServerInfo());
+ } else if (code == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
+ if (rpc instanceof ConnectToMasterRequest) {
+ // Special case:
+ // We never want to retry this RPC, we only use it to poke masters to learn where the leader
+ // is. If the error is truly non recoverable, it'll be handled later.
+ return new RecoverableException(status);
+ } else {
+ // TODO: This is a crutch until we either don't have to retry RPCs going to the
+ // same server or use retry policies.
+ client.handleRetryableError(rpc, new RecoverableException(status));
+ }
+ } else {
+ return new NonRecoverableException(status);
+ }
+ rpc.addTrace(tracer.callStatus(status).build());
+ return null;
+ }
+
+ /**
+ * Retry the given RPC.
+ *
+ * @param client client object to handle response and sending retries, if needed
+ * @param connection connection to send the request over
+ * @param rpc an RPC to retry or fail
+ * @param exception an exception to propagate with the RPC
+ */
+ private static void failOrRetryRpc(AsyncKuduClient client,
+ Connection connection,
+ final KuduRpc<?> rpc,
+ final RecoverableException exception) {
+ rpc.addTrace(new RpcTraceFrame.RpcTraceFrameBuilder(rpc.method(),
+ RpcTraceFrame.Action.RECEIVE_FROM_SERVER)
+ .serverInfo(connection.getServerInfo())
+ .callStatus(exception.getStatus())
+ .build());
+
+ RemoteTablet tablet = rpc.getTablet();
+ // Note As of the time of writing (03/11/16), a null tablet doesn't make sense, if we see a null
+ // tablet it's because we didn't set it properly before calling sendRpc().
+ if (tablet == null) { // Can't retry, dunno where this RPC should go.
+ rpc.errback(exception);
+ } else {
+ client.handleTabletNotFound(rpc, exception, connection.getServerInfo());
+ }
+ }
+
+ /**
+ * @return string representation of the object suitable for printing into logs, etc.
+ */
+ public String toString() {
+ return "RpcProxy@" + hashCode() + ", connection=" + connection;
+ }
+
+ /**
+ * @return underlying {@link Connection} object representing TCP connection to the server
+ */
+ @VisibleForTesting
+ Connection getConnection() {
+ return connection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
index 84e4292..6dc86e9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ServerInfo.java
@@ -20,6 +20,7 @@ package org.apache.kudu.client;
import java.net.InetAddress;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import org.apache.yetus.audience.InterfaceAudience;
@@ -35,7 +36,7 @@ public class ServerInfo {
private final InetAddress resolvedAddr;
private final boolean local;
private static final ConcurrentHashMap<InetAddress, Boolean> isLocalAddressCache =
- new ConcurrentHashMap<>();
+ new ConcurrentHashMap<>();
/**
* Constructor for all the fields. The intent is that there should only be one ServerInfo
@@ -45,6 +46,7 @@ public class ServerInfo {
* @param resolvedAddr resolved address used to check if the server is local
*/
public ServerInfo(String uuid, HostAndPort hostPort, InetAddress resolvedAddr) {
+ Preconditions.checkNotNull(uuid);
this.uuid = uuid;
this.hostPort = hostPort;
this.resolvedAddr = resolvedAddr;
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
deleted file mode 100644
index 9345408..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ /dev/null
@@ -1,779 +0,0 @@
-/*
- * Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * - Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- * - Neither the name of the StumbleUpon nor the names of its contributors
- * may be used to endorse or promote products derived from this software
- * without specific prior written permission.
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-package org.apache.kudu.client;
-
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.concurrent.GuardedBy;
-import javax.net.ssl.SSLException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.protobuf.Message;
-import com.stumbleupon.async.Deferred;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.handler.timeout.ReadTimeoutException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.WireProtocol;
-import org.apache.kudu.client.Negotiator.Result;
-import org.apache.kudu.master.Master;
-import org.apache.kudu.rpc.RpcHeader;
-import org.apache.kudu.tserver.Tserver;
-import org.apache.kudu.util.Pair;
-
-/**
- * Stateful handler that manages a connection to a specific TabletServer.
- * <p>
- * This handler manages the RPC IDs, the serialization and de-serialization of
- * RPC requests and responses, and keeps track of the RPC in flights for which
- * a response is currently awaited, as well as temporarily buffered RPCs that
- * are awaiting to be sent to the network.
- * <p>
- * This class needs careful synchronization. It's a non-sharable handler,
- * meaning there is one instance of it per Netty {@link Channel} and each
- * instance is only used by one Netty IO thread at a time. At the same time,
- * {@link AsyncKuduClient} calls methods of this class from random threads at
- * random times. The bottom line is that any data only used in the Netty IO
- * threads doesn't require synchronization, everything else does.
- * <p>
- * Acquiring the monitor on an object of this class will prevent it from
- * accepting write requests as well as buffering requests if the underlying
- * channel isn't connected.
- */
-@InterfaceAudience.Private
-public class TabletClient extends SimpleChannelUpstreamHandler {
-
- public static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
-
- public static final byte RPC_CURRENT_VERSION = 9;
- /** Initial header sent by the client upon connection establishment */
- private static final byte[] CONNECTION_HEADER = new byte[] { 'h', 'r', 'p', 'c',
- RPC_CURRENT_VERSION, // RPC version.
- 0,
- 0
- };
-
- /**
- * The channel we're connected to. This is set very soon after construction
- * before any other events can arrive.
- */
- private Channel chan;
-
- /** Lock for several of the below fields. */
- private final ReentrantLock lock = new ReentrantLock();
-
- /**
- * RPCs which are waiting to be sent once a connection has been
- * established.
- *
- * This is non-null until the connection is connected, at which point
- * any pending RPCs will be sent, and the variable set to null.
- */
- @GuardedBy("lock")
- ArrayList<KuduRpc<?>> pendingRpcs = Lists.newArrayList();
-
- /**
- * A monotonically increasing counter for RPC IDs.
- */
- @GuardedBy("lock")
- private int nextCallId = 0;
-
- private static enum State {
- NEGOTIATING,
- ALIVE,
- DISCONNECTED
- }
-
- @GuardedBy("lock")
- private State state = State.NEGOTIATING;
-
- @GuardedBy("lock")
- private HashMap<Integer, KuduRpc<?>> rpcsInflight = new HashMap<>();
-
- @GuardedBy("lock")
- private Result negotiationResult;
-
- private final AsyncKuduClient kuduClient;
-
- private final long socketReadTimeoutMs;
-
- private final RequestTracker requestTracker;
-
- private final ServerInfo serverInfo;
-
- /**
- * Set to true when the client initiates a disconnect. The channelDisconnected
- * event handler then knows not to log any warning about unexpected disconnection
- * from the peer.
- */
- private volatile boolean closedByClient;
-
- public TabletClient(AsyncKuduClient client, ServerInfo serverInfo) {
- this.kuduClient = client;
- this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs();
- this.requestTracker = client.getRequestTracker();
- this.serverInfo = serverInfo;
- }
-
- <R> void sendRpc(KuduRpc<R> rpc) {
- Preconditions.checkArgument(rpc.hasDeferred());
- rpc.addTrace(
- new RpcTraceFrame.RpcTraceFrameBuilder(
- rpc.method(),
- RpcTraceFrame.Action.SEND_TO_SERVER)
- .serverInfo(serverInfo)
- .build());
-
- if (!rpc.deadlineTracker.hasDeadline()) {
- LOG.warn(getPeerUuidLoggingString() + " sending an rpc without a timeout " + rpc);
- }
-
- // Serialize the request outside the lock.
- Message req;
- try {
- req = rpc.createRequestPB();
- } catch (Exception e) {
- LOG.error("Uncaught exception while constructing RPC request: " + rpc, e);
- rpc.errback(e); // Make the RPC fail with the exception.
- return;
- }
-
- lock.lock();
- boolean needsUnlock = true;
- try {
- // If we are disconnected, immediately fail the RPC
- if (state == State.DISCONNECTED) {
- lock.unlock();
- needsUnlock = false;
- Status statusNetworkError =
- Status.NetworkError(getPeerUuidLoggingString() + "Connection reset");
- failOrRetryRpc(rpc, new RecoverableException(statusNetworkError));
- return;
- }
-
- // We're still negotiating -- just add it to the pending list
- // which will be sent when the negotiation either completes or
- // fails.
- if (state == State.NEGOTIATING) {
- pendingRpcs.add(rpc);
- return;
- }
-
- // We are alive, in which case we must have a channel.
- assert state == State.ALIVE;
- assert chan != null;
-
- // Check that the server supports feature flags, if our RPC uses them.
- if (!rpc.getRequiredFeatures().isEmpty() &&
- !negotiationResult.serverFeatures.contains(
- RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS)) {
- // We don't want to call out of this class while holding the lock.
- lock.unlock();
- needsUnlock = false;
-
- Status statusNotSupported = Status.NotSupported("the server does not support the" +
- "APPLICATION_FEATURE_FLAGS RPC feature");
- rpc.errback(new NonRecoverableException(statusNotSupported));
- return;
- }
-
- // Assign the call ID and write it to the wire.
- sendCallToWire(rpc, req);
- } finally {
- if (needsUnlock) {
- lock.unlock();
- }
- }
- }
-
- @GuardedBy("lock")
- private <R> void sendCallToWire(final KuduRpc<R> rpc, Message reqPB) {
- assert lock.isHeldByCurrentThread();
- assert state == State.ALIVE;
- assert chan != null;
-
- int callId = nextCallId++;
-
- final RpcHeader.RequestHeader.Builder headerBuilder = RpcHeader.RequestHeader.newBuilder()
- .addAllRequiredFeatureFlags(rpc.getRequiredFeatures())
- .setCallId(callId)
- .setRemoteMethod(
- RpcHeader.RemoteMethodPB.newBuilder()
- .setServiceName(rpc.serviceName())
- .setMethodName(rpc.method()));
-
- // If any timeout is set, find the lowest non-zero one, since this will be the deadline that
- // the server must respect.
- if (rpc.deadlineTracker.hasDeadline() || socketReadTimeoutMs > 0) {
- long millisBeforeDeadline = Long.MAX_VALUE;
- if (rpc.deadlineTracker.hasDeadline()) {
- millisBeforeDeadline = rpc.deadlineTracker.getMillisBeforeDeadline();
- }
-
- long localRpcTimeoutMs = Long.MAX_VALUE;
- if (socketReadTimeoutMs > 0) {
- localRpcTimeoutMs = socketReadTimeoutMs;
- }
-
- headerBuilder.setTimeoutMillis((int) Math.min(millisBeforeDeadline, localRpcTimeoutMs));
- }
-
- if (rpc.isRequestTracked()) {
- RpcHeader.RequestIdPB.Builder requestIdBuilder = RpcHeader.RequestIdPB.newBuilder();
- if (rpc.getSequenceId() == RequestTracker.NO_SEQ_NO) {
- rpc.setSequenceId(requestTracker.newSeqNo());
- }
- requestIdBuilder.setClientId(requestTracker.getClientId());
- requestIdBuilder.setSeqNo(rpc.getSequenceId());
- requestIdBuilder.setAttemptNo(rpc.attempt);
- requestIdBuilder.setFirstIncompleteSeqNo(requestTracker.firstIncomplete());
- headerBuilder.setRequestId(requestIdBuilder);
- }
-
- final KuduRpc<?> oldrpc = rpcsInflight.put(callId, rpc);
- if (oldrpc != null) {
- final String wtf = getPeerUuidLoggingString() +
- "Unexpected state: there was already an RPC in flight with" +
- " callId=" + callId + ": " + oldrpc +
- ". This happened when sending out: " + rpc;
- LOG.error(wtf);
- Status statusIllegalState = Status.IllegalState(wtf);
- // Make it fail. This isn't an expected failure mode.
- oldrpc.errback(new NonRecoverableException(statusIllegalState));
- return;
- }
-
- RpcOutboundMessage outbound = new RpcOutboundMessage(headerBuilder, reqPB);
- Channels.write(chan, outbound);
- }
-
- /**
- * Triggers the channel to be disconnected, which will asynchronously cause all
- * pending and in-flight RPCs to be failed. This method is idempotent.
- */
- @VisibleForTesting
- ChannelFuture disconnect() {
- // 'chan' should never be null, because as soon as this object is created, it's
- // added to a ChannelPipeline, which synchronously fires the channelOpen()
- // event.
- Preconditions.checkNotNull(chan);
- closedByClient = true;
- return Channels.disconnect(chan);
- }
-
- /**
- * Forcefully shuts down the connection to this tablet server and fails all the outstanding RPCs.
- * Only use when shutting down a client.
- * @return deferred object to use to track the shutting down of this connection
- */
- public Deferred<Void> shutdown() {
- ChannelFuture disconnectFuture = disconnect();
- final Deferred<Void> d = new Deferred<Void>();
- disconnectFuture.addListener(new ChannelFutureListener() {
- public void operationComplete(final ChannelFuture future) {
- if (future.isSuccess()) {
- d.callback(null);
- return;
- }
- final Throwable t = future.getCause();
- if (t instanceof Exception) {
- d.callback(t);
- } else {
- // Wrap the Throwable because Deferred doesn't handle Throwables,
- // it only uses Exception.
- Status statusIllegalState = Status.IllegalState("Failed to shutdown: " +
- TabletClient.this);
- d.callback(new NonRecoverableException(statusIllegalState, t));
- }
- }
- });
- return d;
- }
-
- /**
- * The reason we are suppressing the unchecked conversions is because the KuduRpc is coming
- * from a collection that has RPCs with different generics, and there's no way to get "decoded"
- * casted correctly. The best we can do is to rely on the RPC to decode correctly,
- * and to not pass an Exception in the callback.
- */
- @Override
- @SuppressWarnings("unchecked")
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
- Object m = evt.getMessage();
- if (m instanceof Negotiator.Result) {
- ArrayList<KuduRpc<?>> queuedRpcs;
- lock.lock();
- try {
- assert chan != null;
- this.negotiationResult = (Result) m;
- state = State.ALIVE;
- queuedRpcs = pendingRpcs;
- pendingRpcs = null;
- } finally {
- lock.unlock();
- }
- // Send the queued RPCs after dropping the lock, so we don't end up calling
- // their callbacks/errbacks with the lock held.
- //
- // queuedRpcs may be null in the case that we disconnected the client just
- // at the same time as the Negotiator was finishing up.
- if (queuedRpcs != null) {
- sendQueuedRpcs(queuedRpcs);
- }
- return;
- }
- if (!(m instanceof CallResponse)) {
- ctx.sendUpstream(evt);
- return;
- }
- CallResponse response = (CallResponse)m;
- final long start = System.nanoTime();
-
- RpcHeader.ResponseHeader header = response.getHeader();
- if (!header.hasCallId()) {
- final int size = response.getTotalResponseSize();
- final String msg = getPeerUuidLoggingString() + "RPC response (size: " + size + ") doesn't" +
- " have a call ID: " + header;
- LOG.error(msg);
- Status statusIncomplete = Status.Incomplete(msg);
- throw new NonRecoverableException(statusIncomplete);
- }
- final int rpcid = header.getCallId();
-
- KuduRpc<Object> rpc;
- lock.lock();
- try {
- rpc = (KuduRpc<Object>) rpcsInflight.remove(rpcid);
- } finally {
- lock.unlock();
- }
-
- if (rpc == null) {
- final String msg = getPeerUuidLoggingString() + "Invalid rpcid: " + rpcid;
- LOG.error(msg);
- // If we get a bad RPC ID back, we are probably somehow misaligned from
- // the server. So, we disconnect the connection.
- throw new NonRecoverableException(Status.IllegalState(msg));
- }
-
- // Start building the trace, we'll finish it as we parse the response.
- RpcTraceFrame.RpcTraceFrameBuilder traceBuilder =
- new RpcTraceFrame.RpcTraceFrameBuilder(
- rpc.method(),
- RpcTraceFrame.Action.RECEIVE_FROM_SERVER)
- .serverInfo(serverInfo);
-
- Pair<Object, Object> decoded = null;
- KuduException exception = null;
- Status retryableHeaderError = Status.OK();
- if (header.hasIsError() && header.getIsError()) {
- RpcHeader.ErrorStatusPB.Builder errorBuilder = RpcHeader.ErrorStatusPB.newBuilder();
- KuduRpc.readProtobuf(response.getPBMessage(), errorBuilder);
- RpcHeader.ErrorStatusPB error = errorBuilder.build();
- if (error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY)) {
- // We can't return right away, we still need to remove ourselves from 'rpcsInflight', so we
- // populate 'retryableHeaderError'.
- retryableHeaderError = Status.ServiceUnavailable(error.getMessage());
- } else {
- String message = getPeerUuidLoggingString() +
- "Tablet server sent error " + error.getMessage();
- Status status = Status.RemoteError(message);
- exception = new RpcRemoteException(status, error);
- LOG.error(message); // can be useful
- }
- } else {
- try {
- decoded = rpc.deserialize(response, this.serverInfo.getUuid());
- } catch (KuduException ex) {
- exception = ex;
- }
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace(getPeerUuidLoggingString() + " received RPC response: " +
- "rpcId=" + rpcid +
- ", response size=" + response.getTotalResponseSize() +
- ", rpc=" + rpc);
- }
-
- // This check is specifically for the ERROR_SERVER_TOO_BUSY case above.
- if (!retryableHeaderError.ok()) {
- rpc.addTrace(traceBuilder.callStatus(retryableHeaderError).build());
- kuduClient.handleRetryableError(rpc, new RecoverableException(retryableHeaderError));
- return;
- }
-
- // We can get this Message from within the RPC's expected type,
- // so convert it into an exception and nullify decoded so that we use the errback route.
- // Have to do it for both TS and Master errors.
- if (decoded != null) {
- if (decoded.getSecond() instanceof Tserver.TabletServerErrorPB) {
- Tserver.TabletServerErrorPB error = (Tserver.TabletServerErrorPB) decoded.getSecond();
- exception = dispatchTSErrorOrReturnException(rpc, error, traceBuilder);
- if (exception == null) {
- // It was taken care of.
- return;
- } else {
- // We're going to errback.
- decoded = null;
- }
-
- } else if (decoded.getSecond() instanceof Master.MasterErrorPB) {
- Master.MasterErrorPB error = (Master.MasterErrorPB) decoded.getSecond();
- exception = dispatchMasterErrorOrReturnException(rpc, error, traceBuilder);
- if (exception == null) {
- // Exception was taken care of.
- return;
- } else {
- decoded = null;
- }
- }
- }
-
- try {
- if (decoded != null) {
- assert !(decoded.getFirst() instanceof Exception);
- if (kuduClient.isStatisticsEnabled()) {
- rpc.updateStatistics(kuduClient.getStatistics(), decoded.getFirst());
- }
- rpc.addTrace(traceBuilder.callStatus(Status.OK()).build());
- rpc.callback(decoded.getFirst());
- } else {
- if (kuduClient.isStatisticsEnabled()) {
- rpc.updateStatistics(kuduClient.getStatistics(), null);
- }
- rpc.addTrace(traceBuilder.callStatus(exception.getStatus()).build());
- rpc.errback(exception);
- }
- } catch (Exception e) {
- LOG.debug(getPeerUuidLoggingString() + "Unexpected exception while handling RPC #" + rpcid +
- ", rpc=" + rpc, e);
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("------------------<< LEAVING DECODE <<------------------" +
- " time elapsed: " + ((System.nanoTime() - start) / 1000) + "us");
- }
- }
-
- /**
- * Takes care of a few kinds of TS errors that we handle differently, like tablets or leaders
- * moving. Builds and returns an exception if we don't know what to do with it.
- * @param rpc the original RPC call that triggered the error
- * @param error the error the TS sent
- * @return an exception if we couldn't dispatch the error, or null
- */
- private KuduException dispatchTSErrorOrReturnException(
- KuduRpc<?> rpc, Tserver.TabletServerErrorPB error,
- RpcTraceFrame.RpcTraceFrameBuilder traceBuilder) {
- Tserver.TabletServerErrorPB.Code errCode = error.getCode();
- WireProtocol.AppStatusPB.ErrorCode errStatusCode = error.getStatus().getCode();
- Status status = Status.fromTabletServerErrorPB(error);
- if (errCode == Tserver.TabletServerErrorPB.Code.TABLET_NOT_FOUND) {
- kuduClient.handleTabletNotFound(rpc, new RecoverableException(status), this);
- // we're not calling rpc.callback() so we rely on the client to retry that RPC
- } else if (errCode == Tserver.TabletServerErrorPB.Code.TABLET_NOT_RUNNING ||
- errStatusCode == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
- kuduClient.handleRetryableError(rpc, new RecoverableException(status));
- // The following two error codes are an indication that the tablet isn't a leader.
- } else if (errStatusCode == WireProtocol.AppStatusPB.ErrorCode.ILLEGAL_STATE ||
- errStatusCode == WireProtocol.AppStatusPB.ErrorCode.ABORTED) {
- kuduClient.handleNotLeader(rpc, new RecoverableException(status), this);
- } else {
- return new NonRecoverableException(status);
- }
- rpc.addTrace(traceBuilder.callStatus(status).build());
- return null;
- }
-
- /**
- * Provides different handling for various kinds of master errors: re-uses the
- * mechanisms already in place for handling tablet server errors as much as possible.
- * @param rpc the original RPC call that triggered the error
- * @param error the error the master sent
- * @return an exception if we couldn't dispatch the error, or null
- */
- private KuduException dispatchMasterErrorOrReturnException(
- KuduRpc<?> rpc, Master.MasterErrorPB error, RpcTraceFrame.RpcTraceFrameBuilder traceBuilder) {
- WireProtocol.AppStatusPB.ErrorCode code = error.getStatus().getCode();
- Status status = Status.fromMasterErrorPB(error);
- if (error.getCode() == Master.MasterErrorPB.Code.NOT_THE_LEADER) {
- kuduClient.handleNotLeader(rpc, new RecoverableException(status), this);
- } else if (code == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
- if (rpc instanceof ConnectToMasterRequest) {
- // Special case:
- // We never want to retry this RPC, we only use it to poke masters to learn where the leader
- // is. If the error is truly non recoverable, it'll be handled later.
- return new RecoverableException(status);
- } else {
- // TODO: This is a crutch until we either don't have to retry RPCs going to the
- // same server or use retry policies.
- kuduClient.handleRetryableError(rpc, new RecoverableException(status));
- }
- } else {
- return new NonRecoverableException(status);
- }
- rpc.addTrace(traceBuilder.callStatus(status).build());
- return null;
- }
-
- /**
- * Tells whether or not this handler should be used.
- * <p>
- * @return true if this instance can be used, else false if this handler is known to have been
- * disconnected from the server and sending an RPC (via {@link #sendRpc(KuduRpc)}) will be
- * retried in the client right away
- */
- public boolean isAlive() {
- lock.lock();
- try {
- return state == State.ALIVE || state == State.NEGOTIATING;
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception {
- this.chan = e.getChannel();
- super.channelOpen(ctx, e);
- }
-
- @Override
- public void channelConnected(final ChannelHandlerContext ctx,
- final ChannelStateEvent e) {
- assert chan != null;
- Channels.write(chan, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
- Negotiator negotiator = new Negotiator(serverInfo.getHostname(),
- kuduClient.getSecurityContext());
- ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator);
- negotiator.sendHello(chan);
- }
-
- @Override
- public void handleUpstream(final ChannelHandlerContext ctx,
- final ChannelEvent e) throws Exception {
- if (LOG.isTraceEnabled()) {
- LOG.trace(e.toString());
- }
- super.handleUpstream(ctx, e);
- }
-
- @Override
- public void channelDisconnected(final ChannelHandlerContext ctx,
- final ChannelStateEvent e) throws Exception {
- super.channelDisconnected(ctx, e); // Let the ReplayingDecoder cleanup.
- cleanup("Connection disconnected");
- }
-
- @Override
- public void channelClosed(final ChannelHandlerContext ctx,
- final ChannelStateEvent e) throws Exception {
- super.channelClosed(ctx, e);
- cleanup("Connection closed");
- }
-
- /**
- * Cleans up any outstanding or lingering RPC (used when shutting down).
- * <p>
- * All RPCs in flight will fail with a {@link RecoverableException} and
- * all edits buffered will be re-scheduled.
- *
- * @param errorMessage string to describe the cause of cleanup
- */
- private void cleanup(final String errorMessage) {
- final ArrayList<KuduRpc<?>> rpcsToFail = Lists.newArrayList();
-
- lock.lock();
- try {
- // Cleanup can be called multiple times, but we only want to run it once.
- if (state == State.DISCONNECTED) {
- assert pendingRpcs == null;
- return;
- }
- state = State.DISCONNECTED;
-
- // In case we were negotiating, we need to fail any that were waiting
- // for negotiation to complete.
- if (pendingRpcs != null) {
- rpcsToFail.addAll(pendingRpcs);
- pendingRpcs = null;
- }
-
- // Similarly, we need to fail any that were already sent and in-flight.
- if (rpcsInflight != null) {
- rpcsToFail.addAll(rpcsInflight.values());
- rpcsInflight = null;
- }
- } finally {
- lock.unlock();
- }
- Status statusNetworkError = Status.NetworkError(getPeerUuidLoggingString() +
- (errorMessage == null ? "Connection reset" : errorMessage));
- RecoverableException exception = new RecoverableException(statusNetworkError);
-
- failOrRetryRpcs(rpcsToFail, exception);
- }
-
- /**
- * Retry all the given RPCs.
- * @param rpcs a possibly empty but non-{@code null} collection of RPCs to retry or fail
- * @param exception an exception to propagate with the RPCs
- */
- private void failOrRetryRpcs(final Collection<KuduRpc<?>> rpcs,
- final RecoverableException exception) {
- for (final KuduRpc<?> rpc : rpcs) {
- failOrRetryRpc(rpc, exception);
- }
- }
-
- /**
- * Retry the given RPC.
- * @param rpc an RPC to retry or fail
- * @param exception an exception to propagate with the RPC
- */
- private void failOrRetryRpc(final KuduRpc<?> rpc,
- final RecoverableException exception) {
- rpc.addTrace(
- new RpcTraceFrame.RpcTraceFrameBuilder(
- rpc.method(),
- RpcTraceFrame.Action.RECEIVE_FROM_SERVER)
- .serverInfo(serverInfo)
- .callStatus(exception.getStatus())
- .build());
-
- RemoteTablet tablet = rpc.getTablet();
- // Note As of the time of writing (03/11/16), a null tablet doesn't make sense, if we see a null
- // tablet it's because we didn't set it properly before calling sendRpc().
- if (tablet == null) { // Can't retry, dunno where this RPC should go.
- rpc.errback(exception);
- } else {
- kuduClient.handleTabletNotFound(rpc, exception, this);
- }
- }
-
-
- @Override
- public void exceptionCaught(final ChannelHandlerContext ctx,
- final ExceptionEvent event) {
- final Throwable e = event.getCause();
- final Channel c = event.getChannel();
-
- if (e instanceof RejectedExecutionException) {
- LOG.warn(getPeerUuidLoggingString() + "RPC rejected by the executor," +
- " ignore this if we're shutting down", e);
- } else if (e instanceof ReadTimeoutException) {
- LOG.debug(getPeerUuidLoggingString() + "Encountered a read timeout, will close the channel");
- } else if (e instanceof ClosedChannelException) {
- if (!closedByClient) {
- LOG.info(getPeerUuidLoggingString() + "Lost connection to peer");
- }
- } else if (e instanceof SSLException && closedByClient) {
- // There's a race in Netty where, when we call Channel.close(), it tries
- // to send a TLS 'shutdown' message and enters a shutdown state. If another
- // thread races to send actual data on the channel, then Netty will get a
- // bit confused that we are trying to send data and misinterpret it as a
- // renegotiation attempt, and throw an SSLException. So, we just ignore any
- // SSLException if we've already attempted to close.
- } else {
- LOG.error(getPeerUuidLoggingString() + "Unexpected exception from downstream on " + c, e);
- }
- if (c.isOpen()) {
- Channels.close(c); // Will trigger channelClosed(), which will cleanup()
- } else { // else: presumably a connection timeout.
- cleanup(e.getMessage()); // => need to cleanup() from here directly.
- }
- }
-
-
- /**
- * Sends the queued RPCs to the server, once we're connected to it.
- * This gets called after {@link #channelConnected}, once we were able to
- * handshake with the server
- *
- * Must *not* be called with 'lock' held.
- */
- private void sendQueuedRpcs(List<KuduRpc<?>> rpcs) {
- assert !lock.isHeldByCurrentThread();
- for (final KuduRpc<?> rpc : rpcs) {
- LOG.debug(getPeerUuidLoggingString() + "Executing RPC queued: " + rpc);
- sendRpc(rpc);
- }
- }
-
- private String getPeerUuidLoggingString() {
- return "[Peer " + serverInfo.getUuid() + "] ";
- }
-
- ServerInfo getServerInfo() {
- return serverInfo;
- }
-
- public String toString() {
- final StringBuilder buf = new StringBuilder();
- buf.append("TabletClient@")
- .append(hashCode())
- .append("(chan=")
- .append(chan)
- .append(", uuid=")
- .append(serverInfo.getUuid())
- .append(", #pending_rpcs=");
- int npendingRpcs;
- int nInFlight;
- lock.lock();
- try {
- npendingRpcs = pendingRpcs == null ? 0 : pendingRpcs.size();
- nInFlight = rpcsInflight == null ? 0 : rpcsInflight.size();
- } finally {
- lock.unlock();
- }
- buf.append(npendingRpcs);
- buf.append(", #rpcs_inflight=")
- .append(nInFlight)
- .append(')');
- return buf.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 379fadf..1caa21c 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -315,13 +315,15 @@ public class BaseKuduTest {
static final Callback<Object, Object> defaultErrorCB = new Callback<Object, Object>() {
@Override
public Object call(Object arg) throws Exception {
- if (arg == null) return null;
+ if (arg == null) {
+ return null;
+ }
if (arg instanceof Exception) {
LOG.warn("Got exception", (Exception) arg);
} else {
LOG.warn("Got an error response back {}", arg);
}
- return new Exception("Can't recover from error, see previous WARN");
+ return new Exception("cannot recover from error: " + arg);
}
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index 97d5928..a9501a7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -180,12 +180,11 @@ public class ITClient extends BaseKuduTest {
*/
private boolean disconnectNode() {
try {
- if (localAsyncClient.getTabletClients().size() == 0) {
+ final List<Connection> connections = localAsyncClient.getConnectionListCopy();
+ if (connections.isEmpty()) {
return true;
}
-
- int tsToDisconnect = random.nextInt(localAsyncClient.getTabletClients().size());
- localAsyncClient.getTabletClients().get(tsToDisconnect).disconnect();
+ connections.get(random.nextInt(connections.size())).disconnect();
} catch (Exception e) {
if (KEEP_RUNNING_LATCH.getCount() == 0) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
index 1e243a6..bb9791b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
@@ -26,20 +26,11 @@ import org.junit.Test;
public class ITFaultTolerantScanner extends ITScannerMultiTablet {
/**
* Verifies for fault tolerant scanner, it can proceed
- * properly even if shuts down client connection.
- */
- @Test(timeout = 100000)
- public void testFaultTolerantShutDown() throws KuduException {
- clientFaultInjection(true, true);
- }
-
- /**
- * Verifies for fault tolerant scanner, it can proceed
* properly even if disconnects client connection.
*/
@Test(timeout = 100000)
public void testFaultTolerantDisconnect() throws KuduException {
- clientFaultInjection(false, true);
+ clientFaultInjection(true);
}
/**
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
index 0908da4..0f196fb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITNonFaultTolerantScanner.java
@@ -54,8 +54,8 @@ public class ITNonFaultTolerantScanner extends ITScannerMultiTablet {
* properly even if shuts down client connection.
*/
@Test(timeout = 100000)
- public void testNonFaultTolerantShutDown() throws KuduException {
- clientFaultInjection(true, false);
+ public void testNonFaultTolerantDisconnect() throws KuduException {
+ clientFaultInjection(false);
}
/**
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
index c9b930e..6622d5d 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
@@ -155,15 +155,13 @@ public class ITScannerMultiTablet extends BaseKuduTest {
}
/**
- * Injecting failures (disconnect or shutdown client connection) while scanning, to verify:
+ * Injecting failures (i.e. drop client connection) while scanning, to verify:
* both non-fault tolerant scanner and fault tolerant scanner will continue scan as expected.
*
- * @param shutDown if true shutdown client connection, otherwise disconnect
- * @param isFaultTolerant if true uses fault tolerant scanner, otherwise
- * uses non fault-tolerant one
+ * @param isFaultTolerant if true use fault-tolerant scanner, otherwise use non-fault-tolerant one
* @throws Exception
*/
- void clientFaultInjection(boolean shutDown, boolean isFaultTolerant) throws KuduException {
+ void clientFaultInjection(boolean isFaultTolerant) throws KuduException {
KuduScanner scanner = syncClient.newScannerBuilder(table)
.setFaultTolerant(isFaultTolerant)
.batchSizeBytes(1)
@@ -178,15 +176,10 @@ public class ITScannerMultiTablet extends BaseKuduTest {
rowCount += rri.getNumRows();
}
- // Forcefully shutdowns/disconnects the current connection and
- // fails all outstanding RPCs in the middle of scanning.
- if (shutDown) {
- client.shutdownConnection(scanner.currentTablet(),
- scanner.getReplicaSelection());
- } else {
- client.disconnect(scanner.currentTablet(),
- scanner.getReplicaSelection());
- }
+ // Forcefully disconnects the current connection and fails all outstanding RPCs
+ // in the middle of scanning.
+ client.newRpcProxy(scanner.currentTablet().getReplicaSelectedServerInfo(
+ scanner.getReplicaSelection())).getConnection().disconnect();
while (scanner.hasMoreRows()) {
loopCount++;
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index 16f9b79..eab429b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -22,7 +22,6 @@ import java.io.InputStreamReader;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -56,7 +55,7 @@ public class MiniKuduCluster implements AutoCloseable {
private static final int PORT_START = 64030;
// List of threads that print
- private final List<Thread> PROCESS_INPUT_PRINTERS = new ArrayList<>();
+ private final List<Thread> processInputPrinters = new ArrayList<>();
// Map of ports to master servers.
private final Map<Integer, Process> masterProcesses = new ConcurrentHashMap<>();
@@ -337,7 +336,7 @@ public class MiniKuduCluster implements AutoCloseable {
Thread thread = new Thread(printer);
thread.setDaemon(true);
thread.setName(Iterables.getLast(Splitter.on(File.separatorChar).split(command.get(0))) + ":" + port);
- PROCESS_INPUT_PRINTERS.add(thread);
+ processInputPrinters.add(thread);
thread.start();
Thread.sleep(300);
@@ -409,7 +408,7 @@ public class MiniKuduCluster implements AutoCloseable {
return;
}
LOG.info("Killing server at port " + port);
- destroyAndWaitForProcess(ts);
+ terminateAndWait(ts);
}
/**
@@ -418,7 +417,7 @@ public class MiniKuduCluster implements AutoCloseable {
*/
public void killTabletServers() throws InterruptedException {
for (Process tserver : tserverProcesses.values()) {
- destroyAndWaitForProcess(tserver);
+ terminateAndWait(tserver);
}
tserverProcesses.clear();
}
@@ -446,7 +445,7 @@ public class MiniKuduCluster implements AutoCloseable {
return;
}
LOG.info("Killing master at port " + port);
- destroyAndWaitForProcess(master);
+ terminateAndWait(master);
}
/** {@override} */
@@ -456,36 +455,25 @@ public class MiniKuduCluster implements AutoCloseable {
}
/**
- * Stops all the processes and deletes the folders used to store data and the flagfile.
+ * Stops all the test processes; deletes the folders used to store data; deletes the flag file.
*/
public void shutdown() {
- for (Iterator<Process> masterIter = masterProcesses.values().iterator(); masterIter.hasNext(); ) {
- try {
- destroyAndWaitForProcess(masterIter.next());
- } catch (InterruptedException e) {
- // Need to continue cleaning up.
- }
- masterIter.remove();
- }
-
- for (Iterator<Process> tsIter = tserverProcesses.values().iterator(); tsIter.hasNext(); ) {
- try {
- destroyAndWaitForProcess(tsIter.next());
- } catch (InterruptedException e) {
- // Need to continue cleaning up.
- }
- tsIter.remove();
- }
+ boolean wasInterrupted = false;
+ wasInterrupted |= terminateAndWait(masterProcesses);
+ wasInterrupted |= terminateAndWait(tserverProcesses);
// Whether we were interrupted or not above we still destroyed all the processes, so the input
// printers will hit EOFs and stop.
- for (Thread thread : PROCESS_INPUT_PRINTERS) {
+ for (Thread thread : processInputPrinters) {
try {
thread.join();
} catch (InterruptedException e) {
+ wasInterrupted = true;
// Need to continue cleaning up.
+ LOG.info("ignoring request to interrupt; waiting for input printer {} to exit", thread);
}
}
+ processInputPrinters.clear();
for (String path : pathsToDelete) {
try {
@@ -507,14 +495,44 @@ public class MiniKuduCluster implements AutoCloseable {
LOG.warn("Unable to close MiniKdc", e);
}
}
+
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt();
+ }
}
- private void destroyAndWaitForProcess(Process process) throws InterruptedException {
+ private static void terminateAndWait(Process process) throws InterruptedException {
process.destroy();
process.waitFor();
}
/**
+ * Terminate and wait for exit of every process in the specified container.
+ *
+ * @param processes map of processes to terminate
+ * @return true if {@link InterruptedException} was received while waiting for processes'
+ * termination, false otherwise
+ */
+ private static boolean terminateAndWait(Map<Integer, Process> processes) {
+ boolean wasInterrupted = false;
+ for (Process p : processes.values()) {
+ while (true) {
+ try {
+ terminateAndWait(p);
+ break;
+ } catch (InterruptedException e) {
+ wasInterrupted = true;
+ // Not being polite here: ignore the request to interrupt and continue cleaning up.
+ LOG.info("ignoring request to interrupt; waiting process {} to exit", p);
+ }
+ }
+ }
+ processes.clear();
+
+ return wasInterrupted;
+ }
+
+ /**
* Returns the comma-separated list of master addresses.
* @return master addresses
*/
@@ -568,8 +586,7 @@ public class MiniKuduCluster implements AutoCloseable {
LOG.info(line);
}
in.close();
- }
- catch (Exception e) {
+ } catch (Exception e) {
if (!e.getMessage().contains("Stream closed")) {
LOG.error("Caught error while reading a process' output", e);
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index 8153328..5e33203 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -93,16 +93,16 @@ public class TestAsyncKuduClient extends BaseKuduTest {
}
private void disconnectAndWait() throws InterruptedException {
- for (TabletClient tabletClient : client.getTabletClients()) {
- tabletClient.disconnect();
+ for (Connection c : client.getConnectionListCopy()) {
+ c.disconnect();
}
Stopwatch sw = Stopwatch.createStarted();
- boolean allDead = false;
+ boolean disconnected = false;
while (sw.elapsed(TimeUnit.MILLISECONDS) < DEFAULT_SLEEP) {
boolean sleep = false;
- if (!client.getTabletClients().isEmpty()) {
- for (TabletClient tserver : client.getTabletClients()) {
- if (tserver.isAlive()) {
+ if (!client.getConnectionListCopy().isEmpty()) {
+ for (Connection c : client.getConnectionListCopy()) {
+ if (!c.isDisconnected()) {
sleep = true;
break;
}
@@ -112,11 +112,11 @@ public class TestAsyncKuduClient extends BaseKuduTest {
if (sleep) {
Thread.sleep(50);
} else {
- allDead = true;
+ disconnected = true;
break;
}
}
- assertTrue(allDead);
+ assertTrue(disconnected);
}
@Test
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index 41c7c27..80fe206 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -143,7 +143,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
RemoteTablet rt =
client.getTableLocationEntry(table.getTableId(), insert.partitionKey()).getTablet();
String tabletId = rt.getTabletId();
- TabletClient tc = client.getTabletClient(rt.getLeaderUUID());
+ RpcProxy proxy = client.newRpcProxy(rt.getLeaderServerInfo());
try {
// Delete table so we get table not found error.
client.deleteTable(TABLE_NAME).join();
@@ -151,7 +151,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
while (true) {
ListTabletsRequest req = new ListTabletsRequest();
Deferred<ListTabletsResponse> d = req.getDeferred();
- tc.sendRpc(req);
+ proxy.sendRpc(req);
ListTabletsResponse resp = d.join();
if (!resp.getTabletsList().contains(tabletId)) {
break;
@@ -191,7 +191,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
/**
* Regression test for a bug in which, when a tablet client is disconnected
- * and we reconnect, we were previously leaking the old TabletClient
+ * and we reconnect, we were previously leaking the old RpcProxy
* object in the client2tablets map.
*/
@Test(timeout = 100000)
@@ -212,7 +212,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
session.apply(createBasicSchemaInsert(nonReplicatedTable, 1)).join();
- int numClientsBefore = client.getTabletClients().size();
+ int numClientsBefore = client.getConnectionListCopy().size();
// Restart all the tablet servers.
killTabletServers();
@@ -223,7 +223,7 @@ public class TestAsyncKuduSession extends BaseKuduTest {
session.apply(createBasicSchemaInsert(nonReplicatedTable, 2)).join();
// We should not have leaked an entry in the client2tablets map.
- int numClientsAfter = client.getTabletClients().size();
+ int numClientsAfter = client.getConnectionListCopy().size();
assertEquals(numClientsBefore, numClientsAfter);
} finally {
restartTabletServers();
@@ -395,7 +395,8 @@ public class TestAsyncKuduSession extends BaseKuduTest {
assertEquals(20, countRowsInScan(scanner));
// Test removing the connection and then do a rapid set of inserts
- client.getTabletClients().get(0).shutdown().join(DEFAULT_SLEEP);
+ client.getConnectionListCopy().get(0).disconnect()
+ .awaitUninterruptibly(DEFAULT_SLEEP);
session.setMutationBufferSpace(1);
for (int i = 91; i < 101; i++) {
try {
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index 22010ca..2404005 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -18,11 +18,13 @@ package org.apache.kudu.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
-import java.net.InetAddress;
import java.util.List;
+import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import com.stumbleupon.async.Deferred;
import org.junit.Test;
@@ -33,69 +35,98 @@ public class TestConnectionCache {
@Test(timeout = 50000)
public void test() throws Exception {
- try (MiniKuduCluster cluster =
- new MiniKuduCluster.MiniKuduClusterBuilder().numMasters(3).build()) {
+ MiniKuduCluster cluster = null;
+ try {
+ cluster = new MiniKuduCluster.MiniKuduClusterBuilder().numMasters(3).build();
- AsyncKuduClient client =
+ final AsyncKuduClient client =
new AsyncKuduClient.AsyncKuduClientBuilder(cluster.getMasterAddresses()).build();
+ final List<HostAndPort> addresses = cluster.getMasterHostPorts();
- List<HostAndPort> addresses = cluster.getMasterHostPorts();
-
- // Below we ping the masters directly using TabletClient, so if they aren't ready to process
+ // Below we ping the masters directly using RpcProxy, so if they aren't ready to process
// RPCs we'll get an error. Here by listing the tables we make sure this won't happen since
// it won't return until a master leader is found.
client.getTablesList().join();
- ConnectionCache cache = new ConnectionCache(client);
+ final List<ServerInfo> serverInfos = Lists.newArrayList();
int i = 0;
for (HostAndPort hp : addresses) {
- // Ping the process so we go through the whole connection process.
- InetAddress addr = NetUtil.getInetAddress(hp.getHost());
- TabletClient conn =
- cache.newClient(new ServerInfo(i + "", hp, addr));
- pingConnection(conn);
- i++;
+ serverInfos.add(new ServerInfo("" + i, hp, NetUtil.getInetAddress(hp.getHost())));
+ ++i;
}
- assertEquals(3, cache.getImmutableTabletClientsList().size());
- assertFalse(cache.allConnectionsAreDead());
-
- TabletClient conn = cache.getClient("0");
-
- // Kill the connection.
- conn.shutdown().join();
- waitForConnectionToDie(conn);
- assertFalse(conn.isAlive());
- // Make sure the cache also knows it's dead, but that not all the connections are.
- assertFalse(cache.getClient("0").isAlive());
- assertFalse(cache.allConnectionsAreDead());
+ // Ping the process so we go through the whole connection process.
+ for (ServerInfo si : serverInfos) {
+ final RpcProxy h = client.newRpcProxy(si);
+ assertNotNull(h.getConnection());
+ pingConnection(h);
+ }
- // Test reconnecting with only the UUID.
- TabletClient newConn = cache.getLiveClient("0");
- assertFalse(conn == newConn);
- pingConnection(newConn);
+ // 1 tserver and 3 masters and 3 connections from the newRpcProxy() in the loop above.
+ assertEquals(1 + 3 + 3, client.getConnectionListCopy().size());
+ assertFalse(allConnectionsDisconnected(client));
+
+ final RpcProxy proxy = client.newRpcProxy(serverInfos.get(0));
+
+ // Disconnect from the server.
+ proxy.getConnection().disconnect().awaitUninterruptibly();
+ waitForConnectionToClose(proxy.getConnection());
+ assertTrue(proxy.getConnection().isDisconnected());
+
+ // Make sure not all the connections in the connection cache are disconnected yet. Actually,
+ // only the connection to server '0' should be disconnected.
+ assertFalse(allConnectionsDisconnected(client));
+
+ // For a new RpcProxy instance, a new connection to the same destination is established.
+ final RpcProxy newHelper = client.newRpcProxy(serverInfos.get(0));
+ final Connection newConnection = newHelper.getConnection();
+ assertNotNull(newConnection);
+ assertNotSame(proxy.getConnection(), newConnection);
+
+ // The client-->server connection should not be established at this point yet. Wait a little
+ // before checking the state of the connection: this is to check for the status of the
+ // underlying connection _after_ the negotiation is run, if a regression happens. The
+ // negotiation on the underlying connection should be run upon submitting the very first
+ // RPC via the proxy object, not upon creating RpcProxy instance (see KUDU-1878).
+ Thread.sleep(500);
+ assertFalse(newConnection.isReady());
+ pingConnection(newHelper);
+ assertTrue(newConnection.isReady());
// Test disconnecting and make sure we cleaned up all the connections.
- cache.disconnectEverything().join();
- waitForConnectionToDie(cache.getClient("0"));
- waitForConnectionToDie(cache.getClient("1"));
- waitForConnectionToDie(cache.getClient("2"));
- assertTrue(cache.allConnectionsAreDead());
+ for (Connection c : client.getConnectionListCopy()) {
+ c.disconnect().awaitUninterruptibly();
+ waitForConnectionToClose(c);
+ }
+ assertTrue(allConnectionsDisconnected(client));
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private boolean allConnectionsDisconnected(AsyncKuduClient client) {
+ for (Connection c : client.getConnectionListCopy()) {
+ if (!c.isDisconnected()) {
+ return false;
+ }
}
+ return true;
}
- private void waitForConnectionToDie(TabletClient conn) throws InterruptedException {
+ private void waitForConnectionToClose(Connection c) throws InterruptedException {
DeadlineTracker deadlineTracker = new DeadlineTracker();
deadlineTracker.setDeadline(5000);
- while (conn.isAlive() && !deadlineTracker.timedOut()) {
+ while (!c.isDisconnected() && !deadlineTracker.timedOut()) {
Thread.sleep(250);
}
}
- private void pingConnection(TabletClient conn) throws Exception {
+ private void pingConnection(RpcProxy proxy) throws Exception {
PingRequest ping = PingRequest.makeMasterPingRequest();
Deferred<PingResponse> d = ping.getDeferred();
- conn.sendRpc(ping);
+ proxy.sendRpc(ping);
d.join();
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
index bed7e4b..5516648 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
@@ -19,6 +19,7 @@ package org.apache.kudu.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.net.InetAddress;
@@ -40,28 +41,28 @@ public class TestRemoteTablet {
RemoteTablet tablet = getTablet(2);
// Demote the wrong leader, no-op.
- assertEquals("2", tablet.getLeaderUUID());
+ assertEquals("2", tablet.getLeaderServerInfo().getUuid());
tablet.demoteLeader("1");
- assertEquals("2", tablet.getLeaderUUID());
+ assertEquals("2", tablet.getLeaderServerInfo().getUuid());
// Tablet at server 1 was deleted.
assertTrue(tablet.removeTabletClient("1"));
- assertEquals("2", tablet.getLeaderUUID());
+ assertEquals("2", tablet.getLeaderServerInfo().getUuid());
// Simulate another thread trying to remove 1.
assertFalse(tablet.removeTabletClient("1"));
// Tablet at server 0 was deleted.
assertTrue(tablet.removeTabletClient("0"));
- assertEquals("2", tablet.getLeaderUUID());
+ assertEquals("2", tablet.getLeaderServerInfo().getUuid());
// Leader was demoted.
tablet.demoteLeader("2");
- assertEquals(null, tablet.getLeaderUUID());
+ assertNull(tablet.getLeaderServerInfo());
// Simulate another thread doing the same.
tablet.demoteLeader("2");
- assertEquals(null, tablet.getLeaderUUID());
+ assertNull(tablet.getLeaderServerInfo());
}
@Test
@@ -70,11 +71,11 @@ public class TestRemoteTablet {
// Test we can remove it.
assertTrue(tablet.removeTabletClient("2"));
- assertEquals(null, tablet.getLeaderUUID());
+ assertNull(tablet.getLeaderServerInfo());
// Test demoting it doesn't break anything.
tablet.demoteLeader("2");
- assertEquals(null, tablet.getLeaderUUID());
+ assertNull(tablet.getLeaderServerInfo());
}
@Test
@@ -83,11 +84,11 @@ public class TestRemoteTablet {
// Test we can remove it.
assertTrue(tablet.removeTabletClient("0"));
- assertEquals(null, tablet.getLeaderUUID());
+ assertNull(tablet.getLeaderServerInfo());
// Test demoting it doesn't break anything.
tablet.demoteLeader("0");
- assertEquals(null, tablet.getLeaderUUID());
+ assertNull(tablet.getLeaderServerInfo());
// Test removing a server with no leader doesn't break.
assertTrue(tablet.removeTabletClient("2"));
@@ -97,7 +98,7 @@ public class TestRemoteTablet {
public void testLocalReplica() {
RemoteTablet tablet = getTablet(0, 0);
- assertEquals("0", tablet.getClosestUUID());
+ assertEquals("0", tablet.getClosestServerInfo().getUuid());
}
@Test
@@ -105,15 +106,17 @@ public class TestRemoteTablet {
RemoteTablet tablet = getTablet(0, -1);
// We just care about getting one back.
- assertNotNull(tablet.getClosestUUID());
+ assertNotNull(tablet.getClosestServerInfo().getUuid());
}
@Test
public void testReplicaSelection() {
RemoteTablet tablet = getTablet(0, 1);
- assertEquals("0", tablet.getReplicaSelectedUUID(ReplicaSelection.LEADER_ONLY));
- assertEquals("1", tablet.getReplicaSelectedUUID(ReplicaSelection.CLOSEST_REPLICA));
+ assertEquals("0",
+ tablet.getReplicaSelectedServerInfo(ReplicaSelection.LEADER_ONLY).getUuid());
+ assertEquals("1",
+ tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid());
}
private RemoteTablet getTablet(int leaderIndex) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
index 2a22ecd..5eca5d2 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
@@ -29,6 +29,7 @@ public class TestTimeouts extends BaseKuduTest {
/**
* This test case tries different methods that should all timeout, while relying on the client to
* pass down the timeouts to the session and scanner.
+ * TODO(aserbin) this test is flaky; add delays on the server side to make it stable
*/
@Test(timeout = 100000)
public void testLowTimeouts() throws Exception {
[2/2] kudu git commit: [java] separating Connection
Posted by al...@apache.org.
[java] separating Connection
This patch separates lower-level, connection-related functionality
from the TabletClient class into the new Connection class.
The updated TabletClient has been renamed into RpcProxy.
Also, this patch contains other micro-updates on the related code.
In addition, this patch addresses KUDU-1878.
This work is done in the context of KUDU-2013.
Change-Id: Id4ac81d9454631e7501c31576c24f85e968bb871
Reviewed-on: http://gerrit.cloudera.org:8080/7146
Tested-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/58248841
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/58248841
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/58248841
Branch: refs/heads/master
Commit: 58248841f213a64683ee217f025f0a38a8450f74
Parents: 7e74527
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Jun 1 18:39:13 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Jul 5 20:58:46 2017 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/AsyncKuduClient.java | 277 +++----
.../apache/kudu/client/ConnectToCluster.java | 112 ++-
.../java/org/apache/kudu/client/Connection.java | 634 +++++++++++++++
.../org/apache/kudu/client/ConnectionCache.java | 269 ++-----
.../java/org/apache/kudu/client/KuduRpc.java | 6 +-
.../java/org/apache/kudu/client/Negotiator.java | 5 +-
.../org/apache/kudu/client/RemoteTablet.java | 53 +-
.../java/org/apache/kudu/client/RpcProxy.java | 406 ++++++++++
.../java/org/apache/kudu/client/ServerInfo.java | 4 +-
.../org/apache/kudu/client/TabletClient.java | 779 -------------------
.../org/apache/kudu/client/BaseKuduTest.java | 6 +-
.../java/org/apache/kudu/client/ITClient.java | 7 +-
.../kudu/client/ITFaultTolerantScanner.java | 11 +-
.../kudu/client/ITNonFaultTolerantScanner.java | 4 +-
.../kudu/client/ITScannerMultiTablet.java | 21 +-
.../org/apache/kudu/client/MiniKuduCluster.java | 73 +-
.../apache/kudu/client/TestAsyncKuduClient.java | 16 +-
.../kudu/client/TestAsyncKuduSession.java | 13 +-
.../apache/kudu/client/TestConnectionCache.java | 109 ++-
.../apache/kudu/client/TestRemoteTablet.java | 31 +-
.../org/apache/kudu/client/TestTimeouts.java | 1 +
21 files changed, 1504 insertions(+), 1333 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 019a3f5..a304d05 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -27,8 +27,10 @@
package org.apache.kudu.client;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
@@ -43,6 +45,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.security.auth.Subject;
@@ -67,6 +71,7 @@ import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.kudu.Common;
import org.apache.kudu.Schema;
import org.apache.kudu.master.Master;
import org.apache.kudu.master.Master.GetTableLocationsResponsePB;
@@ -126,7 +131,8 @@ public class AsyncKuduClient implements AutoCloseable {
* a lookup of a single partition (e.g. for a write), or re-looking-up a tablet with
* stale information.
*/
- static final int FETCH_TABLETS_PER_POINT_LOOKUP = 10;
+ private static final int FETCH_TABLETS_PER_POINT_LOOKUP = 10;
+
/**
* The number of tablets to fetch from the master when looking up a range of
* tablets.
@@ -142,16 +148,18 @@ public class AsyncKuduClient implements AutoCloseable {
private final ConcurrentHashMap<String, TableLocationsCache> tableLocations =
new ConcurrentHashMap<>();
+ /** A cache to keep track of already opened connections to Kudu servers. */
private final ConnectionCache connectionCache;
@GuardedBy("sessions")
private final Set<AsyncKuduSession> sessions = new HashSet<>();
- // Since the masters also go through TabletClient, we need to treat them as if they were a normal
- // table. We'll use the following fake table name to identify places where we need special
+ // Since RPCs to the masters also go through RpcProxy, we need to treat them as if they were a
+ // normal table. We'll use the following fake table name to identify places where we need special
// handling.
+ // TODO(aserbin) clean this up
static final String MASTER_TABLE_NAME_PLACEHOLDER = "Kudu Master";
- final KuduTable masterTable;
+ private final KuduTable masterTable;
private final List<HostAndPort> masterAddresses;
private final HashedWheelTimer timer;
@@ -212,7 +220,40 @@ public class AsyncKuduClient implements AutoCloseable {
this.timer = b.timer;
String clientId = UUID.randomUUID().toString().replace("-", "");
this.requestTracker = new RequestTracker(clientId);
- this.connectionCache = new ConnectionCache(this);
+ this.connectionCache = new ConnectionCache(
+ securityContext, defaultSocketReadTimeoutMs, timer, channelFactory);
+ }
+
+ /**
+ * Get a proxy to send RPC calls to the specified server.
+ *
+ * @param serverInfo server's information
+ * @return the proxy object bound to the target server
+ */
+ @Nonnull
+ RpcProxy newRpcProxy(final ServerInfo serverInfo) {
+ Preconditions.checkNotNull(serverInfo);
+ return new RpcProxy(this, connectionCache.getConnection(serverInfo));
+ }
+
+ /**
+ * Get a proxy to send RPC calls to Kudu master at the specified end-point.
+ *
+ * @param hostPort master end-point
+ * @return the proxy object bound to the target master
+ */
+ @Nullable
+ RpcProxy newMasterRpcProxy(HostAndPort hostPort) {
+ // We should have a UUID to construct ServerInfo for the master, but we have a chicken
+ // and egg problem, we first need to communicate with the masters to find out about them,
+ // and that's what we're trying to do. The UUID is just used for logging and cache key,
+ // so instead we just use concatenation of master host and port, prefixed with "master-".
+ final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
+ if (inetAddress == null) {
+ // TODO(todd): should we log the resolution failure? throw an exception?
+ return null;
+ }
+ return newRpcProxy(new ServerInfo("master-" + hostPort.toString(), hostPort, inetAddress));
}
/**
@@ -374,7 +415,7 @@ public class AsyncKuduClient implements AutoCloseable {
return sendRpcToTablet(rpc);
}
- Deferred<GetTableSchemaResponse> getTableSchema(String name) {
+ private Deferred<GetTableSchemaResponse> getTableSchema(String name) {
GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable, name);
rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
return sendRpcToTablet(rpc);
@@ -501,9 +542,9 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
- * This callback will be repeatadly used when opening a table until it is done being created.
+ * This callback will be repeatedly used when opening a table until it is done being created.
*/
- Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB> getOpenTableCB(
+ private Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB> getOpenTableCB(
final KuduRpc<KuduTable> rpc, final KuduTable table) {
return new Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB>() {
@Override
@@ -635,18 +676,6 @@ public class AsyncKuduClient implements AutoCloseable {
return requestTracker;
}
- HashedWheelTimer getTimer() {
- return timer;
- }
-
- ClientSocketChannelFactory getChannelFactory() {
- return channelFactory;
- }
-
- SecurityContext getSecurityContext() {
- return securityContext;
- }
-
/**
* Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a particular table.
* @param table the name of the table you intend to scan.
@@ -691,23 +720,20 @@ public class AsyncKuduClient implements AutoCloseable {
*/
Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
RemoteTablet tablet = scanner.currentTablet();
- assert (tablet != null);
+ Preconditions.checkNotNull(tablet);
KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
- String uuid = tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection());
- TabletClient client = connectionCache.getClient(uuid);
// Important to increment the attempts before the next if statement since
// getSleepTimeForRpc() relies on it if the client is null or dead.
nextRequest.attempt++;
- if (client == null || !client.isAlive()) {
- // A null client means we either don't know about this tablet anymore (unlikely) or we
- // couldn't find a leader (which could be triggered by a read timeout).
- // We'll first delay the RPC in case things take some time to settle down, then retry.
- Status statusRemoteError = Status.RemoteError("Not connected to server " + uuid +
- " will retry after a delay");
- return delayedSendRpcToTablet(nextRequest, new RecoverableException(statusRemoteError));
+ final ServerInfo info = tablet.getReplicaSelectedServerInfo(nextRequest.getReplicaSelection());
+ if (info == null) {
+ return delayedSendRpcToTablet(nextRequest, new RecoverableException(Status.RemoteError(
+ String.format("No information on servers hosting tablet %s, will retry later",
+ tablet.getTabletId()))));
}
+
Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
- client.sendRpc(nextRequest);
+ RpcProxy.sendRpc(this, connectionCache.getConnection(info), nextRequest);
return d;
}
@@ -723,55 +749,19 @@ public class AsyncKuduClient implements AutoCloseable {
if (tablet == null) {
return Deferred.fromResult(null);
}
-
- final KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest();
- final TabletClient client = connectionCache.getClient(
- tablet.getReplicaSelectedUUID(closeRequest.getReplicaSelection()));
- if (client == null || !client.isAlive()) {
- // Oops, we couldn't find a tablet server that hosts this tablet. Our
- // cache was probably invalidated while the client was scanning. So
- // we can't close this scanner properly.
- LOG.warn("Cannot close {} properly, no connection open for {}", scanner, tablet);
+ final KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest();
+ final ServerInfo info = tablet.getReplicaSelectedServerInfo(closeRequest.getReplicaSelection());
+ if (info == null) {
return Deferred.fromResult(null);
}
final Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred();
closeRequest.attempt++;
- client.sendRpc(closeRequest);
+ RpcProxy.sendRpc(this, connectionCache.getConnection(info), closeRequest);
return d;
}
/**
- * Forcefully shuts down the RemoteTablet connection and
- * fails all outstanding RPCs.
- *
- * @param tablet the given tablet
- * @param replicaSelection replica selection mechanism to use
- */
- @VisibleForTesting
- void shutdownConnection(RemoteTablet tablet,
- ReplicaSelection replicaSelection) {
- TabletClient client = connectionCache.getClient(
- tablet.getReplicaSelectedUUID(replicaSelection));
- client.shutdown();
- }
-
- /**
- * Forcefully disconnects the RemoteTablet connection and
- * fails all outstanding RPCs.
- *
- * @param tablet the given tablet
- * @param replicaSelection replica selection mechanism to use
- */
- @VisibleForTesting
- void disconnect(RemoteTablet tablet,
- ReplicaSelection replicaSelection) {
- TabletClient client = connectionCache.getClient(
- tablet.getReplicaSelectedUUID(replicaSelection));
- client.disconnect();
- }
-
- /**
* Sends the provided {@link KuduRpc} to the tablet server hosting the leader
* of the tablet identified by the RPC's table and partition key.
*
@@ -812,15 +802,12 @@ public class AsyncKuduClient implements AutoCloseable {
// If we found a tablet, we'll try to find the TS to talk to.
if (entry != null) {
RemoteTablet tablet = entry.getTablet();
- String uuid = tablet.getReplicaSelectedUUID(request.getReplicaSelection());
- if (uuid != null) {
+ ServerInfo info = tablet.getReplicaSelectedServerInfo(request.getReplicaSelection());
+ if (info != null) {
Deferred<R> d = request.getDeferred();
request.setTablet(tablet);
- TabletClient client = connectionCache.getLiveClient(uuid);
- if (client != null) {
- client.sendRpc(request);
- return d;
- }
+ RpcProxy.sendRpc(this, connectionCache.getConnection(info), request);
+ return d;
}
}
@@ -923,7 +910,8 @@ public class AsyncKuduClient implements AutoCloseable {
* @param <R> Request's return type.
* @return An errback.
*/
- <R> Callback<Exception, Exception> getDelayedIsCreateTableDoneErrback(final KuduRpc<R> request) {
+ private <R> Callback<Exception, Exception> getDelayedIsCreateTableDoneErrback(
+ final KuduRpc<R> request) {
return new Callback<Exception, Exception>() {
@Override
public Exception call(Exception e) throws Exception {
@@ -944,26 +932,26 @@ public class AsyncKuduClient implements AutoCloseable {
* @param errback the errback to call if something goes wrong when calling IsCreateTableDone
* @return Deferred used to track the provided KuduRpc
*/
- <R> Deferred<R> delayedIsCreateTableDone(final KuduTable table, final KuduRpc<R> rpc,
- final Callback<Deferred<R>,
- Master.IsCreateTableDoneResponsePB> retryCB,
- final Callback<Exception, Exception> errback) {
+ private <R> Deferred<R> delayedIsCreateTableDone(
+ final KuduTable table,
+ final KuduRpc<R> rpc,
+ final Callback<Deferred<R>,
+ Master.IsCreateTableDoneResponsePB> retryCB,
+ final Callback<Exception, Exception> errback) {
final class RetryTimer implements TimerTask {
public void run(final Timeout timeout) {
String tableId = table.getTableId();
final boolean has_permit = acquireMasterLookupPermit();
- if (!has_permit) {
+ if (!has_permit && !tablesNotServed.contains(tableId)) {
// If we failed to acquire a permit, it's worth checking if someone
// looked up the tablet we're interested in. Every once in a while
// this will save us a Master lookup.
- if (!tablesNotServed.contains(tableId)) {
- try {
- retryCB.call(null);
- return;
- } catch (Exception e) {
- // we're calling RetryRpcCB which doesn't throw exceptions, ignore
- }
+ try {
+ retryCB.call(null);
+ return;
+ } catch (Exception e) {
+ // we're calling RetryRpcCB which doesn't throw exceptions, ignore
}
}
IsCreateTableDoneRequest isCreateTableDoneRequest =
@@ -1027,11 +1015,11 @@ public class AsyncKuduClient implements AutoCloseable {
}
}
- long getSleepTimeForRpc(KuduRpc<?> rpc) {
- byte attemptCount = rpc.attempt;
+ private long getSleepTimeForRpc(KuduRpc<?> rpc) {
+ int attemptCount = rpc.attempt;
assert (attemptCount > 0);
if (attemptCount == 0) {
- LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: " + rpc,
+ LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: {}", rpc,
new Exception("Exception created to collect stack trace"));
attemptCount = 1;
}
@@ -1039,29 +1027,12 @@ public class AsyncKuduClient implements AutoCloseable {
long sleepTime = (long)(Math.pow(2.0, Math.min(attemptCount, 12)) *
sleepRandomizer.nextDouble());
if (LOG.isTraceEnabled()) {
- LOG.trace("Going to sleep for " + sleepTime + " at retry " + rpc.attempt);
+ LOG.trace("Going to sleep for {} at retry {}", sleepTime, rpc.attempt);
}
return sleepTime;
}
/**
- * Modifying the list returned by this method won't change how AsyncKuduClient behaves,
- * but calling certain methods on the returned TabletClients can. For example,
- * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
- * TabletClient#shutdown()}.
- * @return copy of the current TabletClients list
- */
- @VisibleForTesting
- List<TabletClient> getTabletClients() {
- return connectionCache.getImmutableTabletClientsList();
- }
-
- @VisibleForTesting
- TabletClient getTabletClient(String uuid) {
- return connectionCache.getClient(uuid);
- }
-
- /**
* Clears {@link #tableLocations} of the table's entries.
*
* This method makes the maps momentarily inconsistent, and should only be
@@ -1080,7 +1051,7 @@ public class AsyncKuduClient implements AutoCloseable {
* @return {@code true} if this RPC already had too many attempts,
* {@code false} otherwise (in which case it's OK to retry once more)
*/
- static boolean cannotRetryRequest(final KuduRpc<?> rpc) {
+ private static boolean cannotRetryRequest(final KuduRpc<?> rpc) {
return rpc.deadlineTracker.timedOut() || rpc.attempt > MAX_RPC_ATTEMPTS;
}
@@ -1091,8 +1062,8 @@ public class AsyncKuduClient implements AutoCloseable {
* @param cause What was cause of the last failed attempt, if known.
* You can pass {@code null} if the cause is unknown.
*/
- static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> request,
- final KuduException cause) {
+ private static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> request,
+ final KuduException cause) {
String message;
if (request.attempt > MAX_RPC_ATTEMPTS) {
message = "Too many attempts: ";
@@ -1127,7 +1098,7 @@ public class AsyncKuduClient implements AutoCloseable {
// this will save us a Master lookup.
TableLocationsCache.Entry entry = getTableLocationEntry(tableId, partitionKey);
if (entry != null && !entry.isNonCoveredRange() &&
- entry.getTablet().getLeaderUUID() != null) {
+ entry.getTablet().getLeaderServerInfo() != null) {
return Deferred.fromResult(null); // Looks like no lookup needed.
}
}
@@ -1164,9 +1135,8 @@ public class AsyncKuduClient implements AutoCloseable {
*/
Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
// TODO(todd): stop using this 'masterTable' hack.
- return ConnectToCluster.run(masterTable, masterAddresses, parentRpc, connectionCache,
- defaultAdminOperationTimeoutMs)
- .addCallback(
+ return ConnectToCluster.run(masterTable, masterAddresses, parentRpc,
+ defaultAdminOperationTimeoutMs).addCallback(
new Callback<Master.GetTableLocationsResponsePB, ConnectToClusterResponse>() {
@Override
public Master.GetTableLocationsResponsePB call(ConnectToClusterResponse resp) {
@@ -1306,8 +1276,8 @@ public class AsyncKuduClient implements AutoCloseable {
* We're handling a tablet server that's telling us it doesn't have the tablet we're asking for.
* We're in the context of decode() meaning we need to either callback or retry later.
*/
- <R> void handleTabletNotFound(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
- invalidateTabletCache(rpc.getTablet(), server);
+ <R> void handleTabletNotFound(final KuduRpc<R> rpc, KuduException ex, ServerInfo info) {
+ invalidateTabletCache(rpc.getTablet(), info);
handleRetryableError(rpc, ex);
}
@@ -1315,8 +1285,8 @@ public class AsyncKuduClient implements AutoCloseable {
* A tablet server is letting us know that it isn't the specified tablet's leader in response
* a RPC, so we need to demote it and retry.
*/
- <R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
- rpc.getTablet().demoteLeader(server.getServerInfo().getUuid());
+ <R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, ServerInfo info) {
+ rpc.getTablet().demoteLeader(info.getUuid());
handleRetryableError(rpc, ex);
}
@@ -1370,12 +1340,40 @@ public class AsyncKuduClient implements AutoCloseable {
* Remove the tablet server from the RemoteTablet's locations. Right now nothing is removing
* the tablet itself from the caches.
*/
- private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) {
- String uuid = server.getServerInfo().getUuid();
+ private void invalidateTabletCache(RemoteTablet tablet, ServerInfo info) {
+ final String uuid = info.getUuid();
LOG.info("Removing server {} from this tablet's cache {}", uuid, tablet.getTabletId());
tablet.removeTabletClient(uuid);
}
+ /**
+ * Translate master-provided information {@link Master.TSInfoPB} on a tablet server into internal
+ * {@link ServerInfo} representation.
+ *
+ * @param tsInfoPB master-provided information for the tablet server
+ * @return an object that contains all the server's information
+ * @throws UnknownHostException if we cannot resolve the tablet server's IP address
+ */
+ private ServerInfo resolveTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
+ final List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList();
+ final String uuid = tsInfoPB.getPermanentUuid().toStringUtf8();
+ if (addresses.isEmpty()) {
+ LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid);
+ return null;
+ }
+
+ // from meta_cache.cc
+ // TODO: if the TS advertises multiple host/ports, pick the right one
+ // based on some kind of policy. For now just use the first always.
+ final HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0));
+ final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
+ if (inetAddress == null) {
+ throw new UnknownHostException(
+ "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
+ }
+ return new ServerInfo(uuid, hostPort, inetAddress);
+ }
+
/** Callback executed when a master lookup completes. */
private final class MasterLookupCB implements Callback<Object,
Master.GetTableLocationsResponsePB> {
@@ -1418,7 +1416,7 @@ public class AsyncKuduClient implements AutoCloseable {
}
}
- boolean acquireMasterLookupPermit() {
+ private boolean acquireMasterLookupPermit() {
try {
// With such a low timeout, the JVM may chose to spin-wait instead of
// de-scheduling the thread (and causing context switches and whatnot).
@@ -1433,7 +1431,7 @@ public class AsyncKuduClient implements AutoCloseable {
* Releases a master lookup permit that was acquired.
* @see #acquireMasterLookupPermit
*/
- void releaseMasterLookupPermit() {
+ private void releaseMasterLookupPermit() {
masterLookups.release();
}
@@ -1477,7 +1475,7 @@ public class AsyncKuduClient implements AutoCloseable {
List<ServerInfo> servers = new ArrayList<>(tabletPb.getReplicasCount());
for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) {
try {
- ServerInfo serverInfo = connectionCache.connectTS(replica.getTsInfo());
+ ServerInfo serverInfo = resolveTS(replica.getTsInfo());
if (serverInfo != null) {
servers.add(serverInfo);
}
@@ -1508,7 +1506,7 @@ public class AsyncKuduClient implements AutoCloseable {
// right away. If not, we throw an exception that RetryRpcErrback will understand as needing to
// sleep before retrying.
TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
- if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderUUID() == null) {
+ if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderServerInfo() == null) {
throw new NoLeaderFoundException(
Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
}
@@ -1588,8 +1586,9 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
- * Invokes {@link #shutdown()} and waits. This method returns
- * void, so consider invoking shutdown directly if there's a need to handle dangling RPCs.
+ * Invokes {@link #shutdown()} and waits. This method returns void, so consider invoking
+ * {@link #shutdown()} directly if there's a need to handle dangling RPCs.
+ *
* @throws Exception if an error happens while closing the connections
*/
@Override
@@ -1607,7 +1606,8 @@ public class AsyncKuduClient implements AutoCloseable {
* <li>Releases all other resources.</li>
* </ul>
* <strong>Not calling this method before losing the last reference to this
- * instance may result in data loss and other unwanted side effects</strong>
+ * instance may result in data loss and other unwanted side effects.</strong>
+ *
* @return A {@link Deferred}, whose callback chain will be invoked once all
* of the above have been done. If this callback chain doesn't fail, then
* the clean shutdown will be successful, and all the data will be safe on
@@ -1628,6 +1628,7 @@ public class AsyncKuduClient implements AutoCloseable {
super("AsyncKuduClient@" + AsyncKuduClient.super.hashCode() + " shutdown");
}
+ @Override
public void run() {
// This terminates the Executor.
channelFactory.releaseExternalResources();
@@ -1676,7 +1677,7 @@ public class AsyncKuduClient implements AutoCloseable {
// concurrent modification during the iteration.
Set<AsyncKuduSession> copyOfSessions;
synchronized (sessions) {
- copyOfSessions = new HashSet<AsyncKuduSession>(sessions);
+ copyOfSessions = new HashSet<>(sessions);
}
if (sessions.isEmpty()) {
return Deferred.fromResult(null);
@@ -1690,7 +1691,7 @@ public class AsyncKuduClient implements AutoCloseable {
return Deferred.group(deferreds);
}
- private boolean isMasterTable(String tableId) {
+ private static boolean isMasterTable(String tableId) {
// Checking that it's the same instance so there's absolutely no chance of confusing the master
// 'table' for a user one.
return MASTER_TABLE_NAME_PLACEHOLDER == tableId;
@@ -1709,6 +1710,14 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
+ * @return copy of the current TabletClients list
+ */
+ @VisibleForTesting
+ List<Connection> getConnectionListCopy() {
+ return connectionCache.getConnectionListCopy();
+ }
+
+ /**
* Builder class to use in order to connect to Kudu.
* All the parameters beyond those in the constructors are optional.
*/
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index 22b31f0..dfba55d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import com.stumbleupon.async.Callback;
@@ -80,7 +81,8 @@ final class ConnectToCluster {
private static Deferred<ConnectToMasterResponsePB> connectToMaster(
final KuduTable masterTable,
- final TabletClient masterClient, KuduRpc<?> parentRpc,
+ final RpcProxy masterProxy,
+ KuduRpc<?> parentRpc,
long defaultTimeoutMs) {
// TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
// basically reuse in some way the master permits.
@@ -93,7 +95,7 @@ final class ConnectToCluster {
}
Deferred<ConnectToMasterResponsePB> d = rpc.getDeferred();
rpc.attempt++;
- masterClient.sendRpc(rpc);
+ masterProxy.sendRpc(rpc);
// If we are connecting to an older version of Kudu, we'll get an invalid request
// error. In that case, we resend using the older version of the RPC.
@@ -107,10 +109,10 @@ final class ConnectToCluster {
rre.getErrPB().getUnsupportedFeatureFlagsCount() > 0) {
AsyncKuduClient.LOG.debug("Falling back to GetMasterRegistration() RPC to connect " +
"to server running Kudu < 1.3.");
- Deferred<ConnectToMasterResponsePB> newAttempt = rpc.getDeferred();
- assert newAttempt != null;
+ final Deferred<ConnectToMasterResponsePB> newAttempt =
+ Preconditions.checkNotNull(rpc.getDeferred());
rpc.setUseOldMethod();
- masterClient.sendRpc(rpc);
+ masterProxy.sendRpc(rpc);
return newAttempt;
}
}
@@ -128,8 +130,6 @@ final class ConnectToCluster {
* @param masterTable the "placeholder" table used by AsyncKuduClient
* @param masterAddresses the addresses of masters to fetch from
* @param parentRpc RPC that prompted a master lookup, can be null
- * @param connCache the client's connection cache, used for creating connections
- * to masters
* @param defaultTimeoutMs timeout to use for RPCs if the parentRpc has no timeout
* @return a Deferred object for the cluster connection status
*/
@@ -137,7 +137,6 @@ final class ConnectToCluster {
KuduTable masterTable,
List<HostAndPort> masterAddresses,
KuduRpc<?> parentRpc,
- ConnectionCache connCache,
long defaultTimeoutMs) {
ConnectToCluster connector = new ConnectToCluster(masterAddresses);
@@ -146,14 +145,14 @@ final class ConnectToCluster {
// deferred.
for (HostAndPort hostAndPort : masterAddresses) {
Deferred<ConnectToMasterResponsePB> d;
- TabletClient client = connCache.newMasterClient(hostAndPort);
- if (client == null) {
+ RpcProxy proxy = masterTable.getAsyncClient().newMasterRpcProxy(hostAndPort);
+ if (proxy != null) {
+ d = connectToMaster(masterTable, proxy, parentRpc, defaultTimeoutMs);
+ } else {
String message = "Couldn't resolve this master's address " + hostAndPort.toString();
LOG.warn(message);
Status statusIOE = Status.IOError(message);
d = Deferred.fromError(new NonRecoverableException(statusIOE));
- } else {
- d = connectToMaster(masterTable, client, parentRpc, defaultTimeoutMs);
}
d.addCallbacks(connector.callbackForNode(hostAndPort),
connector.errbackForNode(hostAndPort));
@@ -192,56 +191,50 @@ final class ConnectToCluster {
* to responseD.
*/
private void incrementCountAndCheckExhausted() {
- if (countResponsesReceived.incrementAndGet() == numMasters) {
- if (responseDCalled.compareAndSet(false, true)) {
-
- // We want `allUnrecoverable` to only be true if all the masters came back with
- // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
- // that replies with RecoverableException or with an ok response but is a FOLLOWER is
- // enough to keep us retrying.
- boolean allUnrecoverable = true;
- if (exceptionsReceived.size() == countResponsesReceived.get()) {
- for (Exception ex : exceptionsReceived) {
- if (!(ex instanceof NonRecoverableException)) {
- allUnrecoverable = false;
- break;
- }
+ if (countResponsesReceived.incrementAndGet() == numMasters &&
+ responseDCalled.compareAndSet(false, true)) {
+ // We want `allUnrecoverable` to only be true if all the masters came back with
+ // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
+ // that replies with RecoverableException or with an ok response but is a FOLLOWER is
+ // enough to keep us retrying.
+ boolean allUnrecoverable = true;
+ if (exceptionsReceived.size() == countResponsesReceived.get()) {
+ for (Exception ex : exceptionsReceived) {
+ if (!(ex instanceof NonRecoverableException)) {
+ allUnrecoverable = false;
+ break;
}
- } else {
- allUnrecoverable = false;
}
+ } else {
+ allUnrecoverable = false;
+ }
- String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
- if (allUnrecoverable) {
- // This will stop retries.
- String msg = String.format("Couldn't find a valid master in (%s). " +
- "Exceptions received: %s", allHosts,
- Joiner.on(",").join(Lists.transform(
- exceptionsReceived, Functions.toStringFunction())));
- Status s = Status.ServiceUnavailable(msg);
- responseD.callback(new NonRecoverableException(s));
+ String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
+ if (allUnrecoverable) {
+ // This will stop retries.
+ String msg = String.format("Couldn't find a valid master in (%s). " +
+ "Exceptions received: %s", allHosts,
+ Joiner.on(",").join(Lists.transform(
+ exceptionsReceived, Functions.toStringFunction())));
+ Status s = Status.ServiceUnavailable(msg);
+ responseD.callback(new NonRecoverableException(s));
+ } else {
+ String message = String.format("Master config (%s) has no leader.",
+ allHosts);
+ Exception ex;
+ if (exceptionsReceived.isEmpty()) {
+ LOG.warn("None of the provided masters {} is a leader; will retry", allHosts);
+ ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
} else {
- String message = String.format("Master config (%s) has no leader.",
- allHosts);
- Exception ex;
- if (exceptionsReceived.isEmpty()) {
- LOG.warn(String.format(
- "None of the provided masters (%s) is a leader, will retry.",
- allHosts));
- ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
- } else {
- LOG.warn(String.format(
- "Unable to find the leader master (%s), will retry",
- allHosts));
- String joinedMsg = message + " Exceptions received: " +
- Joiner.on(",").join(Lists.transform(
- exceptionsReceived, Functions.toStringFunction()));
- Status s = Status.ServiceUnavailable(joinedMsg);
- ex = new NoLeaderFoundException(s,
- exceptionsReceived.get(exceptionsReceived.size() - 1));
- }
- responseD.callback(ex);
+ LOG.warn("Unable to find the leader master {}; will retry", allHosts);
+ String joinedMsg = message + " Exceptions received: " +
+ Joiner.on(",").join(Lists.transform(
+ exceptionsReceived, Functions.toStringFunction()));
+ Status s = Status.ServiceUnavailable(joinedMsg);
+ ex = new NoLeaderFoundException(s,
+ exceptionsReceived.get(exceptionsReceived.size() - 1));
}
+ responseD.callback(ex);
}
}
}
@@ -273,8 +266,7 @@ final class ConnectToCluster {
// Someone else already found a leader. This is somewhat unexpected
// because this means two nodes think they're the leader, but it's
// not impossible. We'll just ignore it.
- LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " +
- hostAndPort.toString());
+ LOG.debug("Callback already invoked, discarding response({}) from {}", r, hostAndPort);
return null;
}
@@ -304,7 +296,7 @@ final class ConnectToCluster {
@Override
public Void call(Exception e) throws Exception {
- LOG.warn("Error receiving a response from: " + hostAndPort, e);
+ LOG.warn("Error receiving response from {}", hostAndPort, e);
exceptionsReceived.add(e);
incrementCountAndCheckExhausted();
return null;
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
new file mode 100644
index 0000000..ea1e113
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -0,0 +1,634 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.net.ssl.SSLException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.SocketChannelConfig;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.Negotiator.Result;
+import org.apache.kudu.rpc.RpcHeader;
+import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+
+/**
+ * Class representing a connection from the client to a Kudu server (master or tablet server):
+ * a high-level wrapper for the TCP connection between the client and the server.
+ * <p>
+ * It's a stateful handler that manages a connection to a Kudu server.
+ * <p>
+ * This handler manages the RPC IDs, and keeps track of the RPCs in flight for which
+ * a response is currently awaited, as well as temporarily buffered RPCs that are waiting
+ * to be sent to the server.
+ * <p>
+ * Acquiring the monitor on an object of this class will prevent it from
+ * accepting write requests as well as buffering requests if the underlying
+ * channel isn't connected.
+ * TODO(aserbin) clarify on the socketReadTimeoutMs and using per-RPC timeout settings.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class Connection extends SimpleChannelUpstreamHandler {
+ /** Information on the target server. */
+ private final ServerInfo serverInfo;
+
+ /** Security context to use for connection negotiation. */
+ private final SecurityContext securityContext;
+
+ /** Read timeout for the connection (used by Netty's ReadTimeoutHandler) */
+ private final long socketReadTimeoutMs;
+
+ /** Timer to monitor read timeouts for the connection (used by Netty's ReadTimeoutHandler) */
+ private final HashedWheelTimer timer;
+
+ /** The underlying Netty's socket channel. */
+ private final SocketChannel channel;
+
+ /**
+ * Set to true when disconnect initiated explicitly from the client side. The channelDisconnected
+ * event handler then knows not to log any warning about unexpected disconnection from the peer.
+ */
+ private volatile boolean explicitlyDisconnected = false;
+
+ /** Logger: a sink for the log messages originated from this class. */
+ private static final Logger LOG = LoggerFactory.getLogger(Connection.class);
+
+ private static final byte RPC_CURRENT_VERSION = 9;
+
+ /** Initial header sent by the client upon connection establishment. */
+ private static final byte[] CONNECTION_HEADER = new byte[]{'h', 'r', 'p', 'c',
+ RPC_CURRENT_VERSION, // RPC version.
+ 0,
+ 0
+ };
+
+ /** Lock to guard access to some of the fields below. */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /** A state of this object. */
+ @GuardedBy("lock")
+ private State state;
+
+ /**
+ * A hash table to store { callId, statusReportCallback } pairs, representing messages which have
+ * already been sent and pending responses from the server side. Once the server responds to a
+ * message, the corresponding entry is removed from the container and the response callback
+ * is invoked with the results represented by {@link CallResponseInfo}.
+ */
+ @GuardedBy("lock")
+ private HashMap<Integer, Callback<Void, CallResponseInfo>> inflightMessages = new HashMap<>();
+
+ /** Messages enqueued while the connection was not ready to start sending them over the wire. */
+ @GuardedBy("lock")
+ private ArrayList<QueuedMessage> queuedMessages = Lists.newArrayList();
+
+ /** The result of the connection negotiation. */
+ @GuardedBy("lock")
+ private Result negotiationResult = null;
+
+ /** A monotonically increasing counter for RPC IDs. */
+ @GuardedBy("lock")
+ private int nextCallId = 0;
+
+ Connection(ServerInfo serverInfo,
+ SecurityContext securityContext,
+ long socketReadTimeoutMs,
+ HashedWheelTimer timer,
+ ClientSocketChannelFactory channelFactory) {
+ this.serverInfo = serverInfo;
+ this.securityContext = securityContext;
+ this.state = State.NEW;
+ this.socketReadTimeoutMs = socketReadTimeoutMs;
+ this.timer = timer;
+
+ final ConnectionPipeline pipeline = new ConnectionPipeline();
+ pipeline.init();
+
+ channel = channelFactory.newChannel(pipeline);
+ SocketChannelConfig config = channel.getConfig();
+ config.setConnectTimeoutMillis(60000);
+ config.setTcpNoDelay(true);
+ // Unfortunately there is no way to override the keep-alive timeout in
+ // Java since the JRE doesn't expose any way to call setsockopt() with
+ // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
+ config.setKeepAlive(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void channelConnected(final ChannelHandlerContext ctx,
+ final ChannelStateEvent e) {
+ lock.lock();
+ try {
+ Preconditions.checkState(state == State.CONNECTING);
+ state = State.NEGOTIATING;
+ } finally {
+ lock.unlock();
+ }
+ Channels.write(channel, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
+ Negotiator negotiator = new Negotiator(serverInfo.getHostname(), securityContext);
+ ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator);
+ negotiator.sendHello(channel);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void handleUpstream(final ChannelHandlerContext ctx,
+ final ChannelEvent e) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} upstream event {}", getLogPrefix(), e);
+ }
+ super.handleUpstream(ctx, e);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void channelDisconnected(final ChannelHandlerContext ctx,
+ final ChannelStateEvent e) throws Exception {
+ // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
+ // pipeline after Connection itself. So, just handle the disconnection event ourselves.
+ cleanup("connection disconnected");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void channelClosed(final ChannelHandlerContext ctx,
+ final ChannelStateEvent e) throws Exception {
+ // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
+ // pipeline after Connection itself. So, just handle the close event ourselves.
+ cleanup("connection closed");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
+ Object m = evt.getMessage();
+ if (m instanceof Negotiator.Result) {
+ lock.lock();
+ try {
+ Preconditions.checkState(state == State.NEGOTIATING);
+ Preconditions.checkNotNull(queuedMessages);
+
+ state = State.READY;
+ negotiationResult = (Negotiator.Result) m;
+ List<QueuedMessage> queued = queuedMessages;
+ // The queuedMessages should not be used anymore once the connection is negotiated.
+ queuedMessages = null;
+ // Send out all the enqueued messages. This is done while holding the lock to preserve
+ // the sequence of the already enqueued and being-enqueued-right-now messages and simplify
+ // the logic which checks for the consistency using Preconditions.checkXxx() methods.
+ for (final QueuedMessage qm : queued) {
+ sendCallToWire(qm.message, qm.cb);
+ }
+ } finally {
+ lock.unlock();
+ }
+ return;
+ }
+
+ // Some other event which the connection does not handle.
+ if (!(m instanceof CallResponse)) {
+ ctx.sendUpstream(evt);
+ return;
+ }
+
+ final CallResponse response = (CallResponse) m;
+ final RpcHeader.ResponseHeader header = response.getHeader();
+ if (!header.hasCallId()) {
+ final int size = response.getTotalResponseSize();
+ final String msg = getLogPrefix() +
+ " RPC response (size: " + size + ") doesn't" + " have callID: " + header;
+ LOG.error(msg);
+ throw new NonRecoverableException(Status.Incomplete(msg));
+ }
+
+ final int callId = header.getCallId();
+ Callback<Void, CallResponseInfo> responseCbk;
+ lock.lock();
+ try {
+ Preconditions.checkState(state == State.READY);
+ responseCbk = inflightMessages.remove(callId);
+ } finally {
+ lock.unlock();
+ }
+
+ if (responseCbk == null) {
+ final String msg = getLogPrefix() + " invalid callID: " + callId;
+ LOG.error(msg);
+ // If we get a bad RPC ID back, we are probably somehow misaligned from
+ // the server. So, we disconnect the connection.
+ throw new NonRecoverableException(Status.IllegalState(msg));
+ }
+
+ if (!header.hasIsError() || !header.getIsError()) {
+ // The success case.
+ responseCbk.call(new CallResponseInfo(response, null));
+ return;
+ }
+
+ final RpcHeader.ErrorStatusPB.Builder errorBuilder = RpcHeader.ErrorStatusPB.newBuilder();
+ KuduRpc.readProtobuf(response.getPBMessage(), errorBuilder);
+ final RpcHeader.ErrorStatusPB error = errorBuilder.build();
+ if (error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY) ||
+ error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_UNAVAILABLE)) {
+ responseCbk.call(new CallResponseInfo(
+ response, new RecoverableException(Status.ServiceUnavailable(error.getMessage()))));
+ return;
+ }
+
+ final String message = getLogPrefix() + " server sent error " + error.getMessage();
+ LOG.error(message); // can be useful
+ responseCbk.call(new CallResponseInfo(
+ response, new RpcRemoteException(Status.RemoteError(message), error)));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx,
+ final ExceptionEvent event) {
+ final Throwable e = event.getCause();
+ final Channel c = event.getChannel();
+
+ if (e instanceof RejectedExecutionException) {
+ LOG.warn("{} RPC rejected by the executor: {} (ignore if shutting down)", getLogPrefix(), e);
+ } else if (e instanceof ReadTimeoutException) {
+ LOG.debug("{} encountered a read timeout; closing the channel", getLogPrefix());
+ } else if (e instanceof ClosedChannelException) {
+ if (!explicitlyDisconnected) {
+ LOG.info("{} lost connection to peer", getLogPrefix());
+ }
+ } else if (e instanceof SSLException && explicitlyDisconnected) {
+ // There's a race in Netty where, when we call Channel.close(), it tries
+ // to send a TLS 'shutdown' message and enters a shutdown state. If another
+ // thread races to send actual data on the channel, then Netty will get a
+ // bit confused that we are trying to send data and misinterpret it as a
+ // renegotiation attempt, and throw an SSLException. So, we just ignore any
+ // SSLException if we've already attempted to close.
+ LOG.debug("{} ignoring SSLException: already disconnected", getLogPrefix());
+ } else {
+ LOG.error("{} unexpected exception from downstream on {}: {}", getLogPrefix(), c, e);
+ }
+ if (c.isOpen()) {
+ Channels.close(c); // Will trigger channelClosed(), which will cleanup()
+ } else { // else: presumably a connection timeout.
+ cleanup(e.getMessage()); // => need to cleanup() from here directly.
+ }
+ }
+
+ /** Getter for the peer's end-point information */
+ public ServerInfo getServerInfo() {
+ return serverInfo;
+ }
+
+ /**
+ * @return true iff the connection is in the DISCONNECTED state
+ */
+ boolean isDisconnected() {
+ lock.lock();
+ try {
+ return state == State.DISCONNECTED;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * TODO(aserbin) make it possible to avoid calling this when the server features are not known yet
+ *
+ * @return the set of server's features, if known; null otherwise
+ */
+ @Nullable
+ Set<RpcFeatureFlag> getPeerFeatures() {
+ Set<RpcFeatureFlag> features = null;
+ lock.lock();
+ try {
+ if (negotiationResult != null) {
+ features = negotiationResult.serverFeatures;
+ }
+ } finally {
+ lock.unlock();
+ }
+ return features;
+ }
+
+ /**
+ * @return string representation of the peer (i.e. the server) information suitable for logging
+ */
+ String getLogPrefix() {
+ return "[peer " + serverInfo.getUuid() + "]";
+ }
+
+ /**
+ * Enqueue outbound message for sending to the remote server via Kudu RPC. The enqueueMessage()
+ * accepts messages even if the connection hasn't yet been established: the enqueued messages
+ * are sent out as soon as the connection to the server is ready. The connection is initiated upon
+ * enqueuing the very first outbound message.
+ */
+ void enqueueMessage(RpcOutboundMessage msg, Callback<Void, CallResponseInfo> cb)
+ throws RecoverableException {
+ lock.lock();
+ try {
+ if (state == State.DISCONNECTED) {
+ // The upper-level caller should handle the exception and retry using a new connection.
+ throw new RecoverableException(Status.IllegalState(
+ "connection in DISCONNECTED state; cannot enqueue a message"));
+ }
+
+ if (state == State.NEW) {
+ // Schedule connecting to the server.
+ connect();
+ }
+
+ // Set the call identifier for the outgoing RPC.
+ final int callId = nextCallId++;
+ RpcHeader.RequestHeader.Builder headerBuilder = msg.getHeaderBuilder();
+ headerBuilder.setCallId(callId);
+
+ // Amend the timeout for the call, if necessary.
+ if (socketReadTimeoutMs > 0) {
+ final int timeoutMs = headerBuilder.getTimeoutMillis();
+ if (timeoutMs > 0) {
+ headerBuilder.setTimeoutMillis((int) Math.min(timeoutMs, socketReadTimeoutMs));
+ }
+ }
+
+ // If the connection hasn't been negotiated yet, add the message into the queuedMessages list.
+ // The elements of the queuedMessages list will be processed when the negotiation either
+ // succeeds or fails.
+ if (state != State.READY) {
+ queuedMessages.add(new QueuedMessage(msg, cb));
+ return;
+ }
+
+ // It's time to initiate sending the message over the wire.
+ sendCallToWire(msg, cb);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Triggers the channel to be disconnected, which will asynchronously cause all
+ * queued and in-flight RPCs to be failed. This method is idempotent.
+ *
+ * @return future object to wait on the disconnect completion, if necessary
+ */
+ ChannelFuture disconnect() {
+ explicitlyDisconnected = true;
+ return Channels.disconnect(channel);
+ }
+
+ /**
+ * If open, forcefully shut down the connection to the server. This is the same as
+ * {@link #disconnect}, but it returns Deferred instead of ChannelFuture.
+ *
+ * @return deferred object for tracking the shutting down of this connection
+ */
+ Deferred<Void> shutdown() {
+ final ChannelFuture disconnectFuture = disconnect();
+ final Deferred<Void> d = new Deferred<>();
+ disconnectFuture.addListener(new ChannelFutureListener() {
+ public void operationComplete(final ChannelFuture future) {
+ if (future.isSuccess()) {
+ d.callback(null);
+ return;
+ }
+ final Throwable t = future.getCause();
+ if (t instanceof Exception) {
+ d.callback(t);
+ } else {
+ d.callback(new NonRecoverableException(
+ Status.IllegalState("failed to shutdown: " + this), t));
+ }
+ }
+ });
+ return d;
+ }
+
+ /**
+ * @return string representation of this object (suitable for printing into the logs, etc.)
+ */
+ public String toString() {
+ final StringBuilder buf = new StringBuilder();
+ buf.append("Connection@")
+ .append(hashCode())
+ .append("(channel=")
+ .append(channel)
+ .append(", uuid=")
+ .append(serverInfo.getUuid());
+ int queuedMessagesNum = 0;
+ int inflightMessagesNum = 0;
+ lock.lock();
+ try {
+ queuedMessagesNum = queuedMessages == null ? 0 : queuedMessages.size();
+ inflightMessagesNum = inflightMessages == null ? 0 : inflightMessages.size();
+ } finally {
+ lock.unlock();
+ }
+ buf.append(", #queued=").append(queuedMessagesNum)
+ .append(", #inflight=").append(inflightMessagesNum)
+ .append(")");
+ return buf.toString();
+ }
+
+ /**
+ * This is test-only method.
+ *
+ * @return true iff the connection is in the READY state
+ */
+ @VisibleForTesting
+ boolean isReady() {
+ lock.lock();
+ try {
+ return state == State.READY;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** Start sending the message to the server over the wire. */
+ @GuardedBy("lock")
+ private void sendCallToWire(final RpcOutboundMessage msg, Callback<Void, CallResponseInfo> cb) {
+ Preconditions.checkState(lock.isHeldByCurrentThread());
+ Preconditions.checkState(state == State.READY);
+ Preconditions.checkNotNull(inflightMessages);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} sending {}", getLogPrefix(), msg);
+ }
+ final int callId = msg.getHeaderBuilder().getCallId();
+ final Callback<Void, CallResponseInfo> empty = inflightMessages.put(callId, cb);
+ Preconditions.checkArgument(empty == null);
+ Channels.write(channel, msg);
+ }
+
+ /**
+ * Process the fact that the connection has been disconnected: update the state of this object and
+ * clean up any outstanding or lingering messages, notifying on the error via their status
+ * callbacks. The callee is supposed to handle the error and retry sending the messages,
+ * if needed.
+ *
+ * @param errorMessage a string to describe the cause of the cleanup
+ */
+ private void cleanup(final String errorMessage) {
+ List<QueuedMessage> queued;
+ Map<Integer, Callback<Void, CallResponseInfo>> inflight;
+
+ lock.lock();
+ try {
+ if (state == State.DISCONNECTED) {
+ Preconditions.checkState(queuedMessages == null);
+ Preconditions.checkState(inflightMessages == null);
+ return;
+ }
+
+ queued = queuedMessages;
+ queuedMessages = null;
+
+ inflight = inflightMessages;
+ inflightMessages = null;
+
+ state = State.DISCONNECTED;
+ } finally {
+ lock.unlock();
+ }
+ final Status error = Status.NetworkError(getLogPrefix() + " " +
+ (errorMessage == null ? "connection reset" : errorMessage));
+ final RecoverableException exception = new RecoverableException(error);
+
+ for (Callback<Void, CallResponseInfo> cb : inflight.values()) {
+ try {
+ cb.call(new CallResponseInfo(null, exception));
+ } catch (Exception e) {
+ LOG.warn("{} exception while aborting in-flight call: {}", getLogPrefix(), e);
+ }
+ }
+
+ if (queued != null) {
+ for (QueuedMessage qm : queued) {
+ try {
+ qm.cb.call(new CallResponseInfo(null, exception));
+ } catch (Exception e) {
+ LOG.warn("{} exception while aborting enqueued call: {}", getLogPrefix(), e);
+ }
+ }
+ }
+ }
+
+ /** Initiate opening TCP connection to the server. */
+ private void connect() {
+ Preconditions.checkState(lock.isHeldByCurrentThread());
+ Preconditions.checkState(state == State.NEW);
+ state = State.CONNECTING;
+ channel.connect(new InetSocketAddress(serverInfo.getResolvedAddress(), serverInfo.getPort()));
+ }
+
+ /** State of the Connection object. */
+ private enum State {
+ NEW, // The object has just been created.
+ CONNECTING, // The establishment of TCP connection to the server has started.
+ NEGOTIATING, // The connection negotiation has started.
+ READY, // The connection to the server has been opened, negotiated, and ready to use.
+ DISCONNECTED, // The TCP connection has been dropped off.
+ }
+
+ /**
+ * The class to represent RPC response received from the remote server.
+ * If the {@code exception} is null, then it's a success case and the {@code response} contains
+ * the information on the response. Otherwise it's an error and the {@code exception} provides
+ * information on the error. For the recoverable error case, the {@code exception} is of
+ * {@link RecoverableException} type, otherwise it's of {@link NonRecoverableException} type.
+ */
+ static final class CallResponseInfo {
+ public final CallResponse response;
+ public final KuduException exception;
+
+ CallResponseInfo(CallResponse response, KuduException exception) {
+ this.response = response;
+ this.exception = exception;
+ }
+ }
+
+ /** Internal class representing an enqueued outgoing message. */
+ private static final class QueuedMessage {
+ private final RpcOutboundMessage message;
+ private final Callback<Void, CallResponseInfo> cb;
+
+ QueuedMessage(RpcOutboundMessage message, Callback<Void, CallResponseInfo> cb) {
+ this.message = message;
+ this.cb = cb;
+ }
+ }
+
+ /** The helper class to build the Netty's connection pipeline. */
+ private final class ConnectionPipeline extends DefaultChannelPipeline {
+ void init() {
+ super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
+ KuduRpc.MAX_RPC_SIZE,
+ 0, // length comes at offset 0
+ 4, // length prefix is 4 bytes long
+ 0, // no "length adjustment"
+ 4 /* strip the length prefix */));
+ super.addLast("decode-inbound", new CallResponse.Decoder());
+ super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
+ if (Connection.this.socketReadTimeoutMs > 0) {
+ super.addLast("timeout-handler", new ReadTimeoutHandler(
+ Connection.this.timer, Connection.this.socketReadTimeoutMs, TimeUnit.MILLISECONDS));
+ }
+ super.addLast("kudu-handler", Connection.this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 83ae8f0..844e79f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -17,263 +17,126 @@
package org.apache.kudu.client;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
-import com.google.common.net.HostAndPort;
import com.stumbleupon.async.Deferred;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.channel.DefaultChannelPipeline;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.SocketChannelConfig;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kudu.Common;
-import org.apache.kudu.master.Master;
-import org.apache.kudu.util.NetUtil;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
/**
- * The ConnectionCache is responsible for managing connections to masters and tablet servers.
- * There should only be one instance per Kudu client, and can <strong>not</strong> be shared between
- * clients.
- * <p>
- * {@link TabletClient}s are currently never removed from this cache. Since the map is keyed by
- * UUID, it would require an ever-growing set of unique tablet servers to encounter memory issues.
- * The reason for keeping disconnected connections in the cache is two-fold: 1) it makes
- * reconnecting easier since only UUIDs are passed around so we can use the dead TabletClient's
- * host and port to reconnect (see {@link #getLiveClient(String)}) and 2) having the dead
- * connection prevents tight looping when hitting "Connection refused"-type of errors.
+ * The ConnectionCache is responsible for managing connections to Kudu masters and tablet servers.
+ * There should only be one instance of ConnectionCache per Kudu client, and it should not be
+ * shared between clients.
* <p>
+ * Disconnected instances of the {@link Connection} class are replaced in the cache with instances
+ * when {@link #getConnection(ServerInfo)) method is called with the same destination. Since the map
+ * is keyed by UUID of the server, it would require an ever-growing set of unique Kudu servers
+ * to encounter memory issues.
+ *
* This class is thread-safe.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class ConnectionCache {
+ /** Security context to use for connection negotiation. */
+ private final SecurityContext securityContext;
- private static final Logger LOG = LoggerFactory.getLogger(ConnectionCache.class);
+ /** Read timeout for connections (used by Netty's ReadTimeoutHandler) */
+ private final long socketReadTimeoutMs;
- /**
- * Cache that maps UUIDs to the clients connected to them.
- * <p>
- * This isn't a {@link ConcurrentHashMap} because we want to do an atomic get-and-put,
- * {@code putIfAbsent} isn't a good fit for us since it requires creating
- * an object that may be "wasted" in case another thread wins the insertion
- * race, and we don't want to create unnecessary connections.
- */
- @GuardedBy("lock")
- private final HashMap<String, TabletClient> uuid2client = new HashMap<>();
+ /** Timer to monitor read timeouts for connections (used by Netty's ReadTimeoutHandler) */
+ private final HashedWheelTimer timer;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ /** Netty's channel factory to use by connections. */
+ private final ClientSocketChannelFactory channelFactory;
- private final Lock readLock = lock.readLock();
- private final Lock writeLock = lock.readLock();
+ /** Synchronization primitive to guard access to the fields below. */
+ private final ReentrantLock lock = new ReentrantLock();
- private final AsyncKuduClient kuduClient;
+ @GuardedBy("lock")
+ private final HashMap<String, Connection> uuid2connection = new HashMap<>();
/**
- * Create a new empty ConnectionCache that will used the passed client to create connections.
- * @param client a client that contains the information we need to create connections
+ * Create a new empty ConnectionCache given the specified parameters.
*/
- ConnectionCache(AsyncKuduClient client) {
- this.kuduClient = client;
+ ConnectionCache(SecurityContext securityContext,
+ long socketReadTimeoutMs,
+ HashedWheelTimer timer,
+ ClientSocketChannelFactory channelFactory) {
+ this.securityContext = securityContext;
+ this.socketReadTimeoutMs = socketReadTimeoutMs;
+ this.timer = timer;
+ this.channelFactory = channelFactory;
}
/**
- * Create a connection to a tablet server based on information provided by the master.
- * @param tsInfoPB master-provided information for the tablet server
- * @return an object that contains all the server's information
- * @throws UnknownHostException if we cannot resolve the tablet server's IP address
+ * Get connection to the specified server. If no connection exists or the existing connection
+ * is already disconnected, then create a new connection to the specified server. The newly
+ * created connection is not negotiated until enqueuing the first RPC to the target server.
+ *
+ * @param serverInfo the server end-point to connect to
+ * @return instance of this object with the specified destination
*/
- ServerInfo connectTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
- List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList();
- String uuid = tsInfoPB.getPermanentUuid().toStringUtf8();
- if (addresses.isEmpty()) {
- LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid);
- return null;
- }
-
- // from meta_cache.cc
- // TODO: if the TS advertises multiple host/ports, pick the right one
- // based on some kind of policy. For now just use the first always.
- HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0));
- InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
- if (inetAddress == null) {
- throw new UnknownHostException(
- "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
- }
- return newClient(new ServerInfo(uuid, hostPort, inetAddress)).getServerInfo();
- }
-
- TabletClient newMasterClient(HostAndPort hostPort) {
- // We should pass a UUID here but we have a chicken and egg problem, we first need to
- // communicate with the masters to find out about them, and that's what we're trying to do.
- // The UUID is just used for logging and cache key, so instead we just use a constructed
- // string with the master host and port as.
- return newClient("master-" + hostPort.toString(), hostPort);
- }
+ public Connection getConnection(final ServerInfo serverInfo) {
+ Connection connection;
- TabletClient newClient(String uuid, HostAndPort hostPort) {
- InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
- if (inetAddress == null) {
- // TODO(todd): should we log the resolution failure? throw an exception?
- return null;
- }
-
- ServerInfo serverInfo = new ServerInfo(uuid, hostPort, inetAddress);
- return newClient(serverInfo);
- }
-
- TabletClient newClient(ServerInfo serverInfo) {
- TabletClient client;
- SocketChannel chan;
-
- writeLock.lock();
+ lock.lock();
try {
- client = uuid2client.get(serverInfo.getUuid());
- if (client != null && client.isAlive()) {
- return client;
+ // First try to find an existing connection.
+ connection = uuid2connection.get(serverInfo.getUuid());
+ if (connection == null || connection.isDisconnected()) {
+ // If no valid connection is found, create a new one.
+ connection = new Connection(serverInfo, securityContext,
+ socketReadTimeoutMs, timer, channelFactory);
+ uuid2connection.put(serverInfo.getUuid(), connection);
}
- final TabletClientPipeline pipeline = new TabletClientPipeline();
- client = pipeline.init(serverInfo);
- chan = this.kuduClient.getChannelFactory().newChannel(pipeline);
- uuid2client.put(serverInfo.getUuid(), client);
- } finally {
- writeLock.unlock();
- }
-
- final SocketChannelConfig config = chan.getConfig();
- config.setConnectTimeoutMillis(5000);
- config.setTcpNoDelay(true);
- // Unfortunately there is no way to override the keep-alive timeout in
- // Java since the JRE doesn't expose any way to call setsockopt() with
- // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
- config.setKeepAlive(true);
- chan.connect(new InetSocketAddress(serverInfo.getResolvedAddress(),
- serverInfo.getPort())); // Won't block.
- return client;
- }
-
- /**
- * Get a connection to a server for the given UUID. The returned connection can be down and its
- * state can be queried via {@link TabletClient#isAlive()}. To automatically get a client that's
- * gonna be re-connected automatically, use {@link #getLiveClient(String)}.
- * @param uuid server's identifier
- * @return a connection to a server, or null if the passed UUID isn't known
- */
- TabletClient getClient(String uuid) {
- readLock.lock();
- try {
- return uuid2client.get(uuid);
} finally {
- readLock.unlock();
+ lock.unlock();
}
- }
- /**
- * Get a connection to a server for the given UUID. This method will automatically call
- * {@link #newClient(String, InetAddress, int)} if the cached connection is down.
- * @param uuid server's identifier
- * @return a connection to a server, or null if the passed UUID isn't known
- */
- TabletClient getLiveClient(String uuid) {
- TabletClient client = getClient(uuid);
-
- if (client == null) {
- return null;
- } else if (client.isAlive()) {
- return client;
- } else {
- return newClient(client.getServerInfo());
- }
+ return connection;
}
/**
- * Asynchronously closes every socket, which will also cancel all the RPCs in flight.
+ * Asynchronously terminate every connection. This also cancels all the pending and in-flight
+ * RPCs.
*/
Deferred<ArrayList<Void>> disconnectEverything() {
- readLock.lock();
+ lock.lock();
try {
- ArrayList<Deferred<Void>> deferreds = new ArrayList<>(uuid2client.size());
- for (TabletClient ts : uuid2client.values()) {
- deferreds.add(ts.shutdown());
+ ArrayList<Deferred<Void>> deferreds = new ArrayList<>(uuid2connection.size());
+ for (Connection c : uuid2connection.values()) {
+ deferreds.add(c.shutdown());
}
return Deferred.group(deferreds);
} finally {
- readLock.unlock();
+ lock.unlock();
}
}
/**
- * The list returned by this method can't be modified,
- * but calling certain methods on the returned TabletClients can have an effect. For example,
- * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
- * TabletClient#shutdown()}.
- * @return copy of the current TabletClients list
- */
- List<TabletClient> getImmutableTabletClientsList() {
- readLock.lock();
- try {
- return ImmutableList.copyOf(uuid2client.values());
- } finally {
- readLock.unlock();
- }
- }
-
- /**
- * Queries all the cached connections if they are alive.
- * @return true if all the connections are down, else false
+ * Return a copy of the all-connections-list. This method is exposed only to allow
+ * {@ref AsyncKuduClient} to forward it, so tests could get access to the underlying elements
+ * of the cache.
+ *
+ * @return a copy of the list of all connections in the connection cache
*/
@VisibleForTesting
- boolean allConnectionsAreDead() {
- readLock.lock();
+ List<Connection> getConnectionListCopy() {
+ lock.lock();
try {
- for (TabletClient tserver : uuid2client.values()) {
- if (tserver.isAlive()) {
- return false;
- }
- }
+ return ImmutableList.copyOf(uuid2connection.values());
} finally {
- readLock.unlock();
- }
- return true;
- }
-
- private final class TabletClientPipeline extends DefaultChannelPipeline {
- TabletClient init(ServerInfo serverInfo) {
- super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
- KuduRpc.MAX_RPC_SIZE,
- 0, // length comes at offset 0
- 4, // length prefix is 4 bytes long
- 0, // no "length adjustment"
- 4 /* strip the length prefix */));
- super.addLast("decode-inbound", new CallResponse.Decoder());
- super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
- AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
- final TabletClient client = new TabletClient(kuduClient, serverInfo);
- if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {
- super.addLast("timeout-handler",
- new ReadTimeoutHandler(kuduClient.getTimer(),
- kuduClient.getDefaultSocketReadTimeoutMs(),
- TimeUnit.MILLISECONDS));
- }
- super.addLast("kudu-handler", client);
-
- return client;
+ lock.unlock();
}
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 30f1a9c..0de894f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -131,13 +131,13 @@ public abstract class KuduRpc<R> {
* that access this attribute will have a happens-before relationship with
* the rest of the code, due to other existing synchronization.
*/
- byte attempt; // package-private for TabletClient and AsyncKuduClient only.
+ int attempt; // package-private for RpcProxy and AsyncKuduClient only.
/**
- * Set by TabletClient when isRequestTracked returns true to identify this RPC in the sequence of
+ * Set by RpcProxy when isRequestTracked returns true to identify this RPC in the sequence of
* RPCs sent by this client. Once it is set it should never change unless the RPC is reused.
*/
- long sequenceId = RequestTracker.NO_SEQ_NO;
+ private long sequenceId = RequestTracker.NO_SEQ_NO;
KuduRpc(KuduTable table) {
this.table = table;
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 46d99db..a917af2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -147,9 +147,6 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
*/
private DecoderEmbedder<ChannelBuffer> sslEmbedder;
- /** True if we have negotiated TLS with the server */
- private boolean negotiatedTls;
-
/**
* The nonce sent from the server to the client, or null if negotiation has
* not yet taken place, or the server does not send a nonce.
@@ -290,7 +287,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
// Store the supported features advertised by the server.
serverFeatures = getFeatureFlags(response);
// If the server supports TLS, we will always speak TLS to it.
- negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
+ final boolean negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
// Check the negotiated authentication type sent by the server.
chosenAuthnType = chooseAuthenticationType(response);
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index d4702f8..a6025f7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import com.google.common.base.Objects;
@@ -38,7 +39,7 @@ import org.apache.kudu.master.Master;
* This class encapsulates the information regarding a tablet and its locations.
* <p>
* RemoteTablet's main function is to keep track of where the leader for this
- * tablet is. For example, an RPC might call {@link #getLeaderUUID()}, contact that TS, find
+ * tablet is. For example, an RPC might call {@link #getLeaderServerInfo()}, contact that TS, find
* it's not the leader anymore, and then call {@link #demoteLeader(String)}.
* <p>
* A RemoteTablet's life is expected to be long in a cluster where roles aren't changing often,
@@ -138,53 +139,61 @@ class RemoteTablet implements Comparable<RemoteTablet> {
}
/**
- * Gets the UUID of the tablet server that we think holds the leader replica for this tablet.
- * @return a UUID of a tablet server that we think has the leader, else null
+ * Get the information on the tablet server that we think holds the leader replica for this
+ * tablet.
+ *
+ * @return information on a tablet server that we think has the leader, else null
*/
- String getLeaderUUID() {
+ @Nullable
+ ServerInfo getLeaderServerInfo() {
synchronized (tabletServers) {
- return leaderUuid;
+ return tabletServers.get(leaderUuid);
}
}
/**
- * Gets the UUID of the closest server. If none is closer than the others, returns a random
- * server UUID.
- * @return the UUID of the closest server, which might be any if none is closer, or null if this
- * cache doesn't know of any servers
+ * Get the information on the closest server. If none is closer than the others,
+ * return the information on a randomly picked server.
+ *
+ * @return the information on the closest server, which might be any if none is closer, or null
+ * if this cache doesn't know any servers.
*/
- String getClosestUUID() {
+ @Nullable
+ ServerInfo getClosestServerInfo() {
synchronized (tabletServers) {
- String lastUuid = null;
- for (ServerInfo serverInfo : tabletServers.values()) {
- lastUuid = serverInfo.getUuid();
- if (serverInfo.isLocal()) {
- return serverInfo.getUuid();
+ ServerInfo last = null;
+ for (ServerInfo e : tabletServers.values()) {
+ last = e;
+ if (e.isLocal()) {
+ return e;
}
}
- return lastUuid;
+ return last;
}
}
/**
* Helper function to centralize the calling of methods based on the passed replica selection
* mechanism.
+ *
* @param replicaSelection replica selection mechanism to use
- * @return a UUID for the server that matches the selection, can be null
+ * @return information on the server that matches the selection, can be null
*/
- String getReplicaSelectedUUID(ReplicaSelection replicaSelection) {
+ @Nullable
+ ServerInfo getReplicaSelectedServerInfo(ReplicaSelection replicaSelection) {
switch (replicaSelection) {
case LEADER_ONLY:
- return getLeaderUUID();
+ return getLeaderServerInfo();
case CLOSEST_REPLICA:
- return getClosestUUID();
+ return getClosestServerInfo();
default:
- throw new RuntimeException("Unknown replica selection mechanism " + replicaSelection);
+ throw new RuntimeException("unknown replica selection mechanism " + replicaSelection);
}
}
/**
- * Gets the replicas of this tablet. The returned list may not be mutated.
+ * Get replicas of this tablet. The returned list may not be mutated.
+ *
* @return the replicas of the tablet
*/
List<LocatedTablet.Replica> getReplicas() {