You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/07/12 21:50:03 UTC
kudu git commit: [java] KUDU-2013 re-acquire authn token if expired
Repository: kudu
Updated Branches:
refs/heads/master 81939782a -> 603c1578c
[java] KUDU-2013 re-acquire authn token if expired
This patch introduces automatic authn token re-acquisition when the
current authn token expires. The client automatically retries the RPC
that hits the token expiration error (the error to re-try is seen as
FATAL_INVALID_AUTHENTICATION_TOKEN sent by the server during connection
negotiation).
Added a few of tests to exercise the new retry logic for automatic
token re-acquisition in case of master-only operations, a bare minimum
workload scenario, and one special case of a connection to the master
opened with secondary credentials.
Change-Id: I0be620629c9a8345ecd5e5679c80ee76ca4eaa57
Reviewed-on: http://gerrit.cloudera.org:8080/7250
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/603c1578
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/603c1578
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/603c1578
Branch: refs/heads/master
Commit: 603c1578c78c0377ffafdd9c427ebfd8a206bda3
Parents: 8193978
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue Jun 20 15:25:42 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Jul 12 21:49:15 2017 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/AsyncKuduClient.java | 135 +++++++++++---
.../kudu/client/AuthnTokenReacquirer.java | 178 +++++++++++++++++++
.../apache/kudu/client/ConnectToCluster.java | 7 +-
.../java/org/apache/kudu/client/Connection.java | 177 +++++++++++++-----
.../org/apache/kudu/client/ConnectionCache.java | 111 +++++++-----
.../kudu/client/InvalidAuthnTokenException.java | 39 ++++
.../java/org/apache/kudu/client/Negotiator.java | 52 ++++--
.../java/org/apache/kudu/client/RpcProxy.java | 24 +--
.../org/apache/kudu/client/RpcTraceFrame.java | 6 +
.../org/apache/kudu/client/SecurityContext.java | 15 +-
.../apache/kudu/client/TestAsyncKuduClient.java | 2 +-
.../kudu/client/TestAuthnTokenReacquire.java | 143 +++++++++++++++
.../client/TestAuthnTokenReacquireOpen.java | 99 +++++++++++
.../apache/kudu/client/TestConnectionCache.java | 20 +--
.../org/apache/kudu/client/TestNegotiator.java | 16 +-
15 files changed, 872 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index a304d05..aec75e7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -204,11 +204,13 @@ public class AsyncKuduClient implements AutoCloseable {
private final SecurityContext securityContext;
+ /** A helper to facilitate re-acquiring of authentication token if current one expires. */
+ private final AuthnTokenReacquirer tokenReacquirer;
+
private volatile boolean closed;
private AsyncKuduClient(AsyncKuduClientBuilder b) {
this.channelFactory = b.createChannelFactory();
- this.securityContext = new SecurityContext(b.subject);
this.masterAddresses = b.masterAddresses;
this.masterTable = new KuduTable(this, MASTER_TABLE_NAME_PLACEHOLDER,
MASTER_TABLE_NAME_PLACEHOLDER, null, null);
@@ -216,34 +218,57 @@ public class AsyncKuduClient implements AutoCloseable {
this.defaultAdminOperationTimeoutMs = b.defaultAdminOperationTimeoutMs;
this.defaultSocketReadTimeoutMs = b.defaultSocketReadTimeoutMs;
this.statisticsDisabled = b.statisticsDisabled;
- statistics = statisticsDisabled ? null : new Statistics();
+ this.statistics = statisticsDisabled ? null : new Statistics();
this.timer = b.timer;
- String clientId = UUID.randomUUID().toString().replace("-", "");
- this.requestTracker = new RequestTracker(clientId);
+ this.requestTracker = new RequestTracker(UUID.randomUUID().toString().replace("-", ""));
+
+ this.securityContext = new SecurityContext(b.subject);
this.connectionCache = new ConnectionCache(
securityContext, defaultSocketReadTimeoutMs, timer, channelFactory);
+ this.tokenReacquirer = new AuthnTokenReacquirer(this);
}
/**
- * Get a proxy to send RPC calls to the specified server.
+ * Get a proxy to send RPC calls to the specified server. The result proxy object does not
+ * restrict the type of credentials that may be used to connect to the server: it will use the
+ * secondary credentials if available, otherwise SASL credentials are used to authenticate
+ * the client when negotiating the connection to the server.
*
* @param serverInfo server's information
* @return the proxy object bound to the target server
*/
@Nonnull
RpcProxy newRpcProxy(final ServerInfo serverInfo) {
- Preconditions.checkNotNull(serverInfo);
- return new RpcProxy(this, connectionCache.getConnection(serverInfo));
+ return newRpcProxy(serverInfo, Connection.CredentialsPolicy.ANY_CREDENTIALS);
+ }
+
+ /**
+ * Get a proxy to send RPC calls to the specified server. The result proxy object should use
+ * a connection to the server negotiated with the specified credentials policy.
+ *
+ * @param serverInfo target server information
+ * @param credentialsPolicy authentication credentials policy to use for the connection
+ * negotiation
+ * @return the proxy object bound to the target server
+ */
+ @Nonnull
+ private RpcProxy newRpcProxy(final ServerInfo serverInfo,
+ Connection.CredentialsPolicy credentialsPolicy) {
+ final Connection connection = connectionCache.getConnection(serverInfo, credentialsPolicy);
+ return new RpcProxy(this, connection);
}
/**
* Get a proxy to send RPC calls to Kudu master at the specified end-point.
*
* @param hostPort master end-point
+ * @param credentialsPolicy credentials policy to use for the connection negotiation to the target
+ * master server
* @return the proxy object bound to the target master
*/
@Nullable
- RpcProxy newMasterRpcProxy(HostAndPort hostPort) {
+ RpcProxy newMasterRpcProxy(HostAndPort hostPort,
+ Connection.CredentialsPolicy credentialsPolicy) {
// We should have a UUID to construct ServerInfo for the master, but we have a chicken
// and egg problem, we first need to communicate with the masters to find out about them,
// and that's what we're trying to do. The UUID is just used for logging and cache key,
@@ -253,7 +278,43 @@ public class AsyncKuduClient implements AutoCloseable {
// TODO(todd): should we log the resolution failure? throw an exception?
return null;
}
- return newRpcProxy(new ServerInfo("master-" + hostPort.toString(), hostPort, inetAddress));
+ return newRpcProxy(
+ new ServerInfo("master-" + hostPort.toString(), hostPort, inetAddress), credentialsPolicy);
+ }
+
+ void reconnectToCluster(Callback<Void, Boolean> cb,
+ Callback<Void, Exception> eb) {
+
+ final class ReconnectToClusterCB implements Callback<Void, ConnectToClusterResponse> {
+ private final Callback<Void, Boolean> cb;
+
+ ReconnectToClusterCB(Callback<Void, Boolean> cb) {
+ this.cb = Preconditions.checkNotNull(cb);
+ }
+
+ /**
+ * Report on the token re-acqusition results. The result authn token might be null: in that
+ * case the SASL credentials will be used to negotiate future connections.
+ */
+ @Override
+ public Void call(ConnectToClusterResponse resp) throws Exception {
+ final Master.ConnectToMasterResponsePB masterResponsePB = resp.getConnectResponse();
+ if (masterResponsePB.hasAuthnToken()) {
+ LOG.info("connect to master: received a new authn token");
+ securityContext.setAuthenticationToken(masterResponsePB.getAuthnToken());
+ cb.call(true);
+ } else {
+ LOG.warn("connect to master: received no authn token");
+ securityContext.setAuthenticationToken(null);
+ cb.call(false);
+ }
+ return null;
+ }
+ }
+
+ ConnectToCluster.run(masterTable, masterAddresses, null, defaultAdminOperationTimeoutMs,
+ Connection.CredentialsPolicy.PRIMARY_CREDENTIALS).addCallbacks(
+ new ReconnectToClusterCB(cb), eb);
}
/**
@@ -719,8 +780,7 @@ public class AsyncKuduClient implements AutoCloseable {
* @return A deferred row.
*/
Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
- RemoteTablet tablet = scanner.currentTablet();
- Preconditions.checkNotNull(tablet);
+ RemoteTablet tablet = Preconditions.checkNotNull(scanner.currentTablet());
KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
// Important to increment the attempts before the next if statement since
// getSleepTimeForRpc() relies on it if the client is null or dead.
@@ -733,7 +793,8 @@ public class AsyncKuduClient implements AutoCloseable {
}
Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
- RpcProxy.sendRpc(this, connectionCache.getConnection(info), nextRequest);
+ RpcProxy.sendRpc(this, connectionCache.getConnection(
+ info, Connection.CredentialsPolicy.ANY_CREDENTIALS), nextRequest);
return d;
}
@@ -757,7 +818,8 @@ public class AsyncKuduClient implements AutoCloseable {
final Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred();
closeRequest.attempt++;
- RpcProxy.sendRpc(this, connectionCache.getConnection(info), closeRequest);
+ RpcProxy.sendRpc(this, connectionCache.getConnection(
+ info, Connection.CredentialsPolicy.ANY_CREDENTIALS), closeRequest);
return d;
}
@@ -806,7 +868,8 @@ public class AsyncKuduClient implements AutoCloseable {
if (info != null) {
Deferred<R> d = request.getDeferred();
request.setTablet(tablet);
- RpcProxy.sendRpc(this, connectionCache.getConnection(info), request);
+ RpcProxy.sendRpc(this, connectionCache.getConnection(
+ info, Connection.CredentialsPolicy.ANY_CREDENTIALS), request);
return d;
}
}
@@ -1066,15 +1129,14 @@ public class AsyncKuduClient implements AutoCloseable {
final KuduException cause) {
String message;
if (request.attempt > MAX_RPC_ATTEMPTS) {
- message = "Too many attempts: ";
+ message = "too many attempts: ";
} else {
- message = "RPC can not complete before timeout: ";
+ message = "can not complete before timeout: ";
}
Status statusTimedOut = Status.TimedOut(message + request);
- final Exception e = new NonRecoverableException(statusTimedOut, cause);
- LOG.debug("Cannot continue with this RPC: {} because of: {}", request, message, e);
+ LOG.debug("Cannot continue with RPC because of: {}", statusTimedOut);
Deferred<R> d = request.getDeferred();
- request.errback(e);
+ request.errback(new NonRecoverableException(statusTimedOut, cause));
return d;
}
@@ -1136,14 +1198,13 @@ public class AsyncKuduClient implements AutoCloseable {
Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
// TODO(todd): stop using this 'masterTable' hack.
return ConnectToCluster.run(masterTable, masterAddresses, parentRpc,
- defaultAdminOperationTimeoutMs).addCallback(
+ defaultAdminOperationTimeoutMs, Connection.CredentialsPolicy.ANY_CREDENTIALS).addCallback(
new Callback<Master.GetTableLocationsResponsePB, ConnectToClusterResponse>() {
@Override
public Master.GetTableLocationsResponsePB call(ConnectToClusterResponse resp) {
- // If the response has security info, adopt it.
if (resp.getConnectResponse().hasAuthnToken()) {
- securityContext.setAuthenticationToken(
- resp.getConnectResponse().getAuthnToken());
+ // If the response has security info, adopt it.
+ securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken());
}
List<ByteString> caCerts = resp.getConnectResponse().getCaCertDerList();
if (!caCerts.isEmpty()) {
@@ -1298,6 +1359,32 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
+ * Same as {@link #handleRetryableError(KuduRpc, KuduException)}, but without the delay before
+ * retrying the RPC.
+ *
+ * @param rpc the RPC to retry
+ * @param ex the exception which lead to the attempt of RPC retry
+ */
+ <R> void handleRetryableErrorNoDelay(final KuduRpc<R> rpc, KuduException ex) {
+ if (cannotRetryRequest(rpc)) {
+ tooManyAttemptsOrTimeout(rpc, ex);
+ return;
+ }
+ sendRpcToTablet(rpc);
+ }
+
+ /**
+ * Handle a RPC failed due to invalid authn token error. In short, connect to the Kudu cluster
+ * to acquire a new authentication token and retry the RPC once a new authentication token
+ * is put into the {@link #securityContext}.
+ *
+ * @param rpc the RPC which failed do to invalid authn token
+ */
+ <R> void handleInvalidToken(KuduRpc<R> rpc) {
+ tokenReacquirer.handleAuthnTokenExpiration(rpc);
+ }
+
+ /**
* This methods enable putting RPCs on hold for a period of time determined by
* {@link #getSleepTimeForRpc(KuduRpc)}. If the RPC is out of time/retries, its errback will
* be immediately called.
@@ -1560,7 +1647,7 @@ public class AsyncKuduClient implements AutoCloseable {
public Deferred<LocatedTablet> call(List<LocatedTablet> tablets) {
Preconditions.checkArgument(tablets.size() <= 1,
"found more than one tablet for a single partition key");
- if (tablets.size() == 0) {
+ if (tablets.isEmpty()) {
// Most likely this indicates a non-covered range, but since this
// could race with an alter table partitioning operation (which
// clears the local table locations cache), we check again.
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java
new file mode 100644
index 0000000..8f4811b
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.stumbleupon.async.Callback;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * An utility class to reacquire authentication token when the current one expires.
+ */
+@InterfaceAudience.Private
+final class AuthnTokenReacquirer {
+ /** The Kudu client object the AuthnTokenReacquirer is bound to. */
+ private final AsyncKuduClient client;
+
+ /** A dedicated synchronization object for #queuedRpcs */
+ private final Object queuedRpcsLock = new Object();
+
+ /**
+ * Container to store information on RPCs affected by authn token expiration error. The RPCs
+ * will be retried on successful token re-acquisition attempt or their errback() method
+ * will be called if authn token re-acquisition fails.
+ */
+ @GuardedBy("queuedRpcsLock")
+ private ArrayList<KuduRpc<?>> queuedRpcs = Lists.newArrayList();
+
+ /**
+ * Create a new AuthnTokenReacquirer object.
+ *
+ * @param client the Kudu client object
+ */
+ AuthnTokenReacquirer(AsyncKuduClient client) {
+ this.client = client;
+ }
+
+ /**
+ * Add information on the RPC which failed due to expired authentication token and requires a new
+ * authn token to retry. Calling this method triggers authn token re-acquisition if there is not
+ * active one yet.
+ *
+ * @param rpc the RPC which failed due to the expired authn token error
+ */
+ <R> void handleAuthnTokenExpiration(KuduRpc<R> rpc) {
+ boolean doReacquire = false;
+ synchronized (queuedRpcsLock) {
+ if (queuedRpcs.isEmpty()) {
+ // Using non-emptiness of the container as a state here. If the container is empty, that
+ // means a new re-acquisition round should be started. If the container is not empty,
+ // that means the process of token re-acquisition has already been already and the
+ // elements of the #queuedRpcs container should be processed once a new token
+ // re-acquisition completes (it could succeed or fail).
+ //
+ // TODO(aserbin): introduce a timestamp for the recently acquired authn token, so it would
+ // not try to re-acquire a token too often if there is a race between clearing
+ // the container after token acquisition is completed and scheduling token re-acquisition.
+ doReacquire = true;
+ }
+ queuedRpcs.add(rpc);
+ }
+ rpc.addTrace(new RpcTraceFrame.RpcTraceFrameBuilder(
+ rpc.method(), RpcTraceFrame.Action.GET_NEW_AUTHENTICATION_TOKEN_THEN_RETRY)
+ .build());
+
+ if (doReacquire) {
+ reacquireAuthnToken();
+ }
+ }
+
+ private List<KuduRpc<?>> swapQueuedRpcs() {
+ List<KuduRpc<?>> rpcList;
+ synchronized (queuedRpcsLock) {
+ rpcList = queuedRpcs;
+ queuedRpcs = Lists.newArrayList();
+ }
+ assert !rpcList.isEmpty();
+ return rpcList;
+ }
+
+ private void reacquireAuthnToken() {
+
+ /**
+ * An utility class providing callbacks for successful completion of authn token re-acqusition.
+ */
+ final class NewAuthnTokenCB implements Callback<Void, Boolean> {
+ /**
+ * Callback upon 'successful' completion of an attempt to acquire a new token,
+ * i.e. an attempt where no exception detected in the code path.
+ *
+ * @param tokenAcquired {@code true} if a new token acquired, {@code false} if
+ * the ConnectToCluster yielded no authn token.
+ */
+ @Override
+ public Void call(Boolean tokenAcquired) throws Exception {
+ // TODO(aserbin): do we need to handle a successful re-connect with no token some other way?
+ retryQueuedRpcs();
+ return null;
+ }
+
+ /**
+ * Handle the affected RPCs on the completion of authn token re-acquisition. The result authn
+ * token might be null, so in that case primary credentials will be used for future
+ * connection negotiations.
+ */
+ void retryQueuedRpcs() {
+ List<KuduRpc<?>> list = swapQueuedRpcs();
+ for (KuduRpc<?> rpc : list) {
+ client.handleRetryableErrorNoDelay(rpc, null);
+ }
+ }
+ }
+
+ /**
+ * Errback to retry authn token re-acquisition and notify the handle the affected RPCs if the
+ * re-acquisition failed after some number of retries (currently, it's 5 attempts).
+ *
+ * TODO(aserbin): perhaps we should retry indefinitely with increasing backoff, but aggressively
+ * timeout RPCs in the queue after each failure.
+ */
+ final class NewAuthnTokenErrB implements Callback<Void, Exception> {
+ private static final int MAX_ATTEMPTS = 5;
+ private final NewAuthnTokenCB cb;
+ private int attempts = 0;
+
+ NewAuthnTokenErrB(NewAuthnTokenCB cb) {
+ this.cb = cb;
+ }
+
+ @Override
+ public Void call(Exception e) {
+ if (e instanceof RecoverableException && attempts < MAX_ATTEMPTS) {
+ client.reconnectToCluster(cb, this);
+ ++attempts;
+ return null;
+ }
+
+ failQueuedRpcs();
+ return null;
+ }
+
+ /** Handle the affected RPCs if authn token re-acquisition fails. */
+ void failQueuedRpcs() {
+ List<KuduRpc<?>> rpcList = swapQueuedRpcs();
+ for (KuduRpc<?> rpc : rpcList) {
+ Exception reason = new NonRecoverableException(Status.NotAuthorized(String.format(
+ "cannot re-acquire authentication token after %d attempts", MAX_ATTEMPTS)));
+ rpc.errback(reason);
+ }
+ }
+ }
+
+ final NewAuthnTokenCB newTokenCb = new NewAuthnTokenCB();
+ final NewAuthnTokenErrB newTokenErrb = new NewAuthnTokenErrB(newTokenCb);
+ client.reconnectToCluster(newTokenCb, newTokenErrb);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index dfba55d..3f9c9e5 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -131,13 +131,15 @@ final class ConnectToCluster {
* @param masterAddresses the addresses of masters to fetch from
* @param parentRpc RPC that prompted a master lookup, can be null
* @param defaultTimeoutMs timeout to use for RPCs if the parentRpc has no timeout
+ * @param credentialsPolicy credentials policy to use for connection negotiation
* @return a Deferred object for the cluster connection status
*/
public static Deferred<ConnectToClusterResponse> run(
KuduTable masterTable,
List<HostAndPort> masterAddresses,
KuduRpc<?> parentRpc,
- long defaultTimeoutMs) {
+ long defaultTimeoutMs,
+ Connection.CredentialsPolicy credentialsPolicy) {
ConnectToCluster connector = new ConnectToCluster(masterAddresses);
// Try to connect to each master. The ConnectToCluster instance
@@ -145,7 +147,8 @@ final class ConnectToCluster {
// deferred.
for (HostAndPort hostAndPort : masterAddresses) {
Deferred<ConnectToMasterResponsePB> d;
- RpcProxy proxy = masterTable.getAsyncClient().newMasterRpcProxy(hostAndPort);
+ RpcProxy proxy = masterTable.getAsyncClient().newMasterRpcProxy(
+ hostAndPort, credentialsPolicy);
if (proxy != null) {
d = connectToMaster(masterTable, proxy, parentRpc, defaultTimeoutMs);
} else {
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index eea1b82..5034e5b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -60,7 +60,6 @@ import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kudu.client.Negotiator.Result;
import org.apache.kudu.rpc.RpcHeader;
import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
@@ -77,23 +76,46 @@ import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
* Acquiring the monitor on an object of this class will prevent it from
* accepting write requests as well as buffering requests if the underlying
* channel isn't connected.
+ *
* TODO(aserbin) clarify on the socketReadTimeoutMs and using per-RPC timeout settings.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class Connection extends SimpleChannelUpstreamHandler {
+ /**
+ * Authentication credentials policy for negotiating outbound connections. Some requests
+ * (e.g. {@link ConnectToMasterRequest}) behave differently depending on the type of credentials
+ * used for authentication when negotiating on the underlying connection. If some particular
+ * behavior is required, it's necessary to specify appropriate credentials policy while creating
+ * an instance of this object.
+ */
+ public enum CredentialsPolicy {
+ /** It's acceptable to use authentication credentials of any type, primary or secondary ones. */
+ ANY_CREDENTIALS,
+
+ /**
+ * Only primary credentials are acceptable. Primary credentials are Kerberos tickets,
+ * TLS certificate. Secondary credentials are authentication tokens: they are 'derived'
+ * in the sense that it's possible to acquire them using 'primary' credentials.
+ */
+ PRIMARY_CREDENTIALS,
+ }
+
/** Information on the target server. */
private final ServerInfo serverInfo;
/** Security context to use for connection negotiation. */
private final SecurityContext securityContext;
- /** Read timeout for the connection (used by Netty's ReadTimeoutHandler) */
+ /** Read timeout for the connection (used by Netty's ReadTimeoutHandler). */
private final long socketReadTimeoutMs;
- /** Timer to monitor read timeouts for the connection (used by Netty's ReadTimeoutHandler) */
+ /** Timer to monitor read timeouts for the connection (used by Netty's ReadTimeoutHandler). */
private final HashedWheelTimer timer;
+ /** Credentials policy to use when authenticating. */
+ private final CredentialsPolicy credentialsPolicy;
+
/** The underlying Netty's socket channel. */
private final SocketChannel channel;
@@ -118,7 +140,7 @@ class Connection extends SimpleChannelUpstreamHandler {
/** Lock to guard access to some of the fields below. */
private final ReentrantLock lock = new ReentrantLock();
- /** A state of this object. */
+ /** The current state of this Connection object. */
@GuardedBy("lock")
private State state;
@@ -135,24 +157,43 @@ class Connection extends SimpleChannelUpstreamHandler {
@GuardedBy("lock")
private ArrayList<QueuedMessage> queuedMessages = Lists.newArrayList();
- /** The result of the connection negotiation. */
+ /** The result of the successful connection negotiation. */
@GuardedBy("lock")
- private Result negotiationResult = null;
+ private Negotiator.Success negotiationResult = null;
+
+ /** The result of failed connection negotiation. */
+ @GuardedBy("lock")
+ private Negotiator.Failure negotiationFailure = null;
/** A monotonically increasing counter for RPC IDs. */
@GuardedBy("lock")
private int nextCallId = 0;
+ /**
+ * Create a new Connection object to the specified destination.
+ *
+ * @param serverInfo the destination server
+ * @param securityContext security context to use for connection negotiation
+ * @param socketReadTimeoutMs timeout for the read operations on the socket
+ * @param timer timer to set up read timeout on the corresponding Netty channel
+ * @param channelFactory Netty factory to create corresponding Netty channel
+ * @param credentialsPolicy policy controlling which credentials to use while negotiating on the
+ * connection to the target server:
+ * if {@link CredentialsPolicy#PRIMARY_CREDENTIALS}, the authentication
+ * token from the security context is ignored
+ */
Connection(ServerInfo serverInfo,
SecurityContext securityContext,
long socketReadTimeoutMs,
HashedWheelTimer timer,
- ClientSocketChannelFactory channelFactory) {
+ ClientSocketChannelFactory channelFactory,
+ CredentialsPolicy credentialsPolicy) {
this.serverInfo = serverInfo;
this.securityContext = securityContext;
this.state = State.NEW;
this.socketReadTimeoutMs = socketReadTimeoutMs;
this.timer = timer;
+ this.credentialsPolicy = credentialsPolicy;
final ConnectionPipeline pipeline = new ConnectionPipeline();
pipeline.init();
@@ -179,7 +220,8 @@ class Connection extends SimpleChannelUpstreamHandler {
lock.unlock();
}
Channels.write(channel, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
- Negotiator negotiator = new Negotiator(serverInfo.getHostname(), securityContext);
+ Negotiator negotiator = new Negotiator(serverInfo.getHostname(), securityContext,
+ (credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS));
ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator);
negotiator.sendHello(channel);
}
@@ -198,7 +240,7 @@ class Connection extends SimpleChannelUpstreamHandler {
@Override
public void channelDisconnected(final ChannelHandlerContext ctx,
final ChannelStateEvent e) throws Exception {
- // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
+ // No need to call super.channelDisconnected(ctx, e) -- there should be nobody in the upstream
// pipeline after Connection itself. So, just handle the disconnection event ourselves.
cleanup("connection disconnected");
}
@@ -216,15 +258,17 @@ class Connection extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
Object m = evt.getMessage();
- if (m instanceof Negotiator.Result) {
+
+ // Process the results of a successful negotiation.
+ if (m instanceof Negotiator.Success) {
lock.lock();
try {
Preconditions.checkState(state == State.NEGOTIATING);
- Preconditions.checkNotNull(queuedMessages);
+ Preconditions.checkState(inflightMessages.isEmpty());
state = State.READY;
- negotiationResult = (Negotiator.Result) m;
- List<QueuedMessage> queued = queuedMessages;
+ negotiationResult = (Negotiator.Success) m;
+ List<QueuedMessage> queued = Preconditions.checkNotNull(queuedMessages);
// The queuedMessages should not be used anymore once the connection is negotiated.
queuedMessages = null;
// Send out all the enqueued messages. This is done while holding the lock to preserve
@@ -239,6 +283,24 @@ class Connection extends SimpleChannelUpstreamHandler {
return;
}
+ // Process the results of a failed negotiation.
+ if (m instanceof Negotiator.Failure) {
+ lock.lock();
+ try {
+ Preconditions.checkState(state == State.NEGOTIATING);
+ Preconditions.checkState(inflightMessages.isEmpty());
+
+ state = State.NEGOTIATION_FAILED;
+ negotiationFailure = (Negotiator.Failure) m;
+ } finally {
+ lock.unlock();
+ }
+ // Calling Channels.close() triggers the cleanup() which will handle the negotiation
+ // failure appropriately.
+ Channels.close(evt.getChannel());
+ return;
+ }
+
// Some other event which the connection does not handle.
if (!(m instanceof CallResponse)) {
ctx.sendUpstream(evt);
@@ -320,10 +382,13 @@ class Connection extends SimpleChannelUpstreamHandler {
} else {
LOG.error("{} unexpected exception from downstream on {}: {}", getLogPrefix(), c, e);
}
+
if (c.isOpen()) {
- Channels.close(c); // Will trigger channelClosed(), which will cleanup()
- } else { // else: presumably a connection timeout.
- cleanup(e.getMessage()); // => need to cleanup() from here directly.
+ // Calling Channels.close() will trigger channelClosed(), which will call cleanup().
+ Channels.close(c);
+ } else {
+ // Presumably a connection timeout: initiating the clean-up directly from here.
+ cleanup(e.getMessage());
}
}
@@ -332,13 +397,16 @@ class Connection extends SimpleChannelUpstreamHandler {
return serverInfo;
}
- /**
- * @return true iff the connection is in the DISCONNECTED state
- */
- boolean isDisconnected() {
+ /** The credentials policy used for the connection negotiation. */
+ CredentialsPolicy getCredentialsPolicy() {
+ return credentialsPolicy;
+ }
+
+ /** @return true iff the connection is in the TERMINATED state */
+ boolean isTerminated() {
lock.lock();
try {
- return state == State.DISCONNECTED;
+ return state == State.TERMINATED;
} finally {
lock.unlock();
}
@@ -363,9 +431,7 @@ class Connection extends SimpleChannelUpstreamHandler {
return features;
}
- /**
- * @return string representation of the peer (i.e. the server) information suitable for logging
- */
+ /** @return string representation of the peer information suitable for logging */
String getLogPrefix() {
return "[peer " + serverInfo.getUuid() + "]";
}
@@ -380,10 +446,9 @@ class Connection extends SimpleChannelUpstreamHandler {
throws RecoverableException {
lock.lock();
try {
- if (state == State.DISCONNECTED) {
+ if (state == State.TERMINATED) {
// The upper-level caller should handle the exception and retry using a new connection.
- throw new RecoverableException(Status.IllegalState(
- "connection in DISCONNECTED state; cannot enqueue a message"));
+ throw new RecoverableException(Status.IllegalState("connection is terminated"));
}
if (state == State.NEW) {
@@ -457,9 +522,7 @@ class Connection extends SimpleChannelUpstreamHandler {
return d;
}
- /**
- * @return string representation of this object (suitable for printing into the logs, etc.)
- */
+ /** @return string representation of this object (suitable for printing into the logs, etc.) */
public String toString() {
final StringBuilder buf = new StringBuilder();
buf.append("Connection@")
@@ -503,14 +566,13 @@ class Connection extends SimpleChannelUpstreamHandler {
private void sendCallToWire(final RpcOutboundMessage msg, Callback<Void, CallResponseInfo> cb) {
Preconditions.checkState(lock.isHeldByCurrentThread());
Preconditions.checkState(state == State.READY);
- Preconditions.checkNotNull(inflightMessages);
if (LOG.isTraceEnabled()) {
LOG.trace("{} sending {}", getLogPrefix(), msg);
}
final int callId = msg.getHeaderBuilder().getCallId();
final Callback<Void, CallResponseInfo> empty = inflightMessages.put(callId, cb);
- Preconditions.checkArgument(empty == null);
+ Preconditions.checkState(empty == null);
Channels.write(channel, msg);
}
@@ -526,13 +588,22 @@ class Connection extends SimpleChannelUpstreamHandler {
List<QueuedMessage> queued;
Map<Integer, Callback<Void, CallResponseInfo>> inflight;
+ boolean needNewAuthnToken = false;
lock.lock();
try {
- if (state == State.DISCONNECTED) {
+ if (state == State.TERMINATED) {
+ // The cleanup has already run.
Preconditions.checkState(queuedMessages == null);
Preconditions.checkState(inflightMessages == null);
return;
}
+ if (state == State.NEGOTIATION_FAILED) {
+ Preconditions.checkState(negotiationFailure != null);
+ Preconditions.checkState(inflightMessages.isEmpty());
+ needNewAuthnToken = negotiationFailure.status.getCode().equals(
+ RpcHeader.ErrorStatusPB.RpcErrorCodePB.FATAL_INVALID_AUTHENTICATION_TOKEN);
+ }
+ LOG.debug("{} cleaning up while in state {} due to: {}", getLogPrefix(), state, errorMessage);
queued = queuedMessages;
queuedMessages = null;
@@ -540,13 +611,14 @@ class Connection extends SimpleChannelUpstreamHandler {
inflight = inflightMessages;
inflightMessages = null;
- state = State.DISCONNECTED;
+ state = State.TERMINATED;
} finally {
lock.unlock();
}
final Status error = Status.NetworkError(getLogPrefix() + " " +
(errorMessage == null ? "connection reset" : errorMessage));
- final RecoverableException exception = new RecoverableException(error);
+ final RecoverableException exception =
+ needNewAuthnToken ? new InvalidAuthnTokenException(error) : new RecoverableException(error);
for (Callback<Void, CallResponseInfo> cb : inflight.values()) {
try {
@@ -575,13 +647,36 @@ class Connection extends SimpleChannelUpstreamHandler {
channel.connect(new InetSocketAddress(serverInfo.getResolvedAddress(), serverInfo.getPort()));
}
- /** State of the Connection object. */
+ /** Enumeration to represent the internal state of the Connection object. */
private enum State {
- NEW, // The object has just been created.
- CONNECTING, // The establishment of TCP connection to the server has started.
- NEGOTIATING, // The connection negotiation has started.
- READY, // The connection to the server has been opened, negotiated, and ready to use.
- DISCONNECTED, // The TCP connection has been dropped off.
+ /** The object has just been created. */
+ NEW,
+
+ /** The establishment of TCP connection to the server has started. */
+ CONNECTING,
+
+ /** The connection negotiation has started. */
+ NEGOTIATING,
+
+ /**
+ * The underlying TCP connection has been dropped off due to negotiation error and there are
+ * enqueued messages to handle. Once connection negotiation fails, the Connection object
+ * handles the affected queued RPCs appropriately. If the negotiation failed due to invalid
+ * authn token error, the upper-level code may attempt to acquire a new authentication token
+ * in that case. The connection transitions into the TERMINATED state upon notifying the
+ * affected RPCs on the connection negotiation failure.
+ */
+ NEGOTIATION_FAILED,
+
+ /** The connection to the server is opened, negotiated, and ready to use. */
+ READY,
+
+ /**
+ * The TCP connection has been dropped off, the proper clean-up procedure has run and no queued
+ * nor in-flight messages are left. In this state, the object does not accept new messages,
+ * throwing RecoverableException upon call of the enqueueMessage() method.
+ */
+ TERMINATED,
}
/**
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 844e79f..53e8e59 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -18,12 +18,13 @@
package org.apache.kudu.client;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.Set;
import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.stumbleupon.async.Deferred;
import org.apache.yetus.audience.InterfaceAudience;
@@ -37,16 +38,20 @@ import org.jboss.netty.util.HashedWheelTimer;
* There should only be one instance of ConnectionCache per Kudu client, and it should not be
* shared between clients.
* <p>
- * Disconnected instances of the {@link Connection} class are replaced in the cache with instances
- * when {@link #getConnection(ServerInfo)) method is called with the same destination. Since the map
- * is keyed by UUID of the server, it would require an ever-growing set of unique Kudu servers
- * to encounter memory issues.
+ * Disconnected instances of the {@link Connection} class are replaced in the cache with new ones
+ * when {@link #getConnection(ServerInfo, Connection.CredentialsPolicy)} method is called with the
+ * same destination and matching credentials policy. Since the map is keyed by the UUID of the
+ * target server, the theoretical maximum number of elements in the cache is twice the number of
+ * all servers in the cluster (i.e. both masters and tablet servers). However, in practice it's
+ * 2 * number of masters + number of tablet servers since tablet servers do not require connections
+ * negotiated with primary credentials.
*
* This class is thread-safe.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class ConnectionCache {
+
/** Security context to use for connection negotiation. */
private final SecurityContext securityContext;
@@ -59,15 +64,15 @@ class ConnectionCache {
/** Netty's channel factory to use by connections. */
private final ClientSocketChannelFactory channelFactory;
- /** Synchronization primitive to guard access to the fields below. */
- private final ReentrantLock lock = new ReentrantLock();
-
- @GuardedBy("lock")
- private final HashMap<String, Connection> uuid2connection = new HashMap<>();
-
/**
- * Create a new empty ConnectionCache given the specified parameters.
+ * Container mapping server UUID into the established connection from the client to the server.
+ * It may be up to two connections per server: one established with secondary credentials
+ * (e.g. authn token), another with primary ones (e.g. Kerberos credentials).
*/
+ @GuardedBy("uuid2connection")
+ private final HashMultimap<String, Connection> uuid2connection = HashMultimap.create();
+
+ /** Create a new empty ConnectionCache given the specified parameters. */
ConnectionCache(SecurityContext securityContext,
long socketReadTimeoutMs,
HashedWheelTimer timer,
@@ -84,59 +89,81 @@ class ConnectionCache {
* created connection is not negotiated until enqueuing the first RPC to the target server.
*
* @param serverInfo the server end-point to connect to
+ * @param credentialsPolicy authentication credentials policy for the connection negotiation
* @return instance of this object with the specified destination
*/
- public Connection getConnection(final ServerInfo serverInfo) {
- Connection connection;
-
- lock.lock();
- try {
- // First try to find an existing connection.
- connection = uuid2connection.get(serverInfo.getUuid());
- if (connection == null || connection.isDisconnected()) {
- // If no valid connection is found, create a new one.
- connection = new Connection(serverInfo, securityContext,
- socketReadTimeoutMs, timer, channelFactory);
- uuid2connection.put(serverInfo.getUuid(), connection);
+ public Connection getConnection(final ServerInfo serverInfo,
+ Connection.CredentialsPolicy credentialsPolicy) {
+ Connection result = null;
+ synchronized (uuid2connection) {
+ // Create and register a new connection object into the cache if one of the following is true:
+ //
+ // * There isn't a registered connection to the specified destination.
+ //
+ // * There is a connection to the specified destination, but it's in TERMINATED state.
+ // Such connections cannot be used again and should be recycled. The connection cache
+ // lazily removes such entries.
+ //
+ // * A connection negotiated with primary credentials is requested but the only registered
+ // one does not have such property. In this case, the already existing connection
+ // (negotiated with secondary credentials, i.e. authn token) is kept in the cache and
+ // a new one is created to be open and negotiated with primary credentials. The newly
+ // created connection is put into the cache along with old one. We don't do anything
+ // special to the old connection to shut it down since it may be still in use. We rely
+ // on the server to close inactive connections in accordance with their TTL settings.
+ //
+ final Set<Connection> connections = uuid2connection.get(serverInfo.getUuid());
+ Iterator<Connection> it = connections.iterator();
+ while (it.hasNext()) {
+ Connection c = it.next();
+ if (c.isTerminated()) {
+ // Lazy recycling of the terminated connections: removing them from the cache upon
+ // an attempt to connect to the same destination again.
+ it.remove();
+ continue;
+ }
+ if (credentialsPolicy == Connection.CredentialsPolicy.ANY_CREDENTIALS ||
+ credentialsPolicy == c.getCredentialsPolicy()) {
+ // If the connection policy allows for using any credentials or the connection is
+ // negotiated using the given credentials type, this is the connection we are looking for.
+ result = c;
+ }
+ }
+ if (result == null) {
+ result = new Connection(serverInfo, securityContext,
+ socketReadTimeoutMs, timer, channelFactory, credentialsPolicy);
+ connections.add(result);
+ // There can be at most 2 connections to the same destination: one with primary and another
+ // with secondary credentials.
+ assert connections.size() <= 2;
}
- } finally {
- lock.unlock();
}
- return connection;
+ return result;
}
- /**
- * Asynchronously terminate every connection. This also cancels all the pending and in-flight
- * RPCs.
- */
+ /** Asynchronously terminate every connection. This cancels all the pending and in-flight RPCs. */
Deferred<ArrayList<Void>> disconnectEverything() {
- lock.lock();
- try {
- ArrayList<Deferred<Void>> deferreds = new ArrayList<>(uuid2connection.size());
+ synchronized (uuid2connection) {
+ List<Deferred<Void>> deferreds = new ArrayList<>(uuid2connection.size());
for (Connection c : uuid2connection.values()) {
deferreds.add(c.shutdown());
}
return Deferred.group(deferreds);
- } finally {
- lock.unlock();
}
}
/**
* Return a copy of the all-connections-list. This method is exposed only to allow
- * {@ref AsyncKuduClient} to forward it, so tests could get access to the underlying elements
+ * {@link AsyncKuduClient} to forward it, so tests could get access to the underlying elements
* of the cache.
*
* @return a copy of the list of all connections in the connection cache
*/
@VisibleForTesting
List<Connection> getConnectionListCopy() {
- lock.lock();
- try {
+ synchronized (uuid2connection) {
return ImmutableList.copyOf(uuid2connection.values());
- } finally {
- lock.unlock();
}
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthnTokenException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthnTokenException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthnTokenException.java
new file mode 100644
index 0000000..2f88414
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthnTokenException.java
@@ -0,0 +1,39 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.client;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Exception for notifying on invalid authn token. In most use cases in the Kudu Java client code,
+ * 'invalid authn token' means 'expired authn token'. Receiving this exception means the current
+ * authentication token is no longer valid and a new one is needed to establish connections to
+ * the Kudu servers for sending RPCs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class InvalidAuthnTokenException extends RecoverableException {
+ /**
+ * @param status status object containing the reason for the exception trace
+ */
+ InvalidAuthnTokenException(Status status) {
+ super(status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index a917af2..4736b37 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -84,11 +84,11 @@ import org.apache.kudu.util.SecurityUtil;
/**
* Netty Pipeline handler which runs connection negotiation with
* the server. When negotiation is complete, this removes itself
- * from the pipeline and fires a Negotiator.Result upstream.
+ * from the pipeline and fires a Negotiator.Success or Negotiator.Failure upstream.
*/
@InterfaceAudience.Private
public class Negotiator extends SimpleChannelUpstreamHandler {
- static final Logger LOG = LoggerFactory.getLogger(Negotiator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Negotiator.class);
private static final SaslClientCallbackHandler SASL_CALLBACK = new SaslClientCallbackHandler();
private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
@@ -164,10 +164,16 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
@VisibleForTesting
boolean overrideLoopbackForTests;
- public Negotiator(String remoteHostname, SecurityContext securityContext) {
+ public Negotiator(String remoteHostname,
+ SecurityContext securityContext,
+ boolean ignoreAuthnToken) {
this.remoteHostname = remoteHostname;
this.securityContext = securityContext;
- this.authnToken = securityContext.getAuthenticationToken();
+ if (ignoreAuthnToken) {
+ this.authnToken = null;
+ } else {
+ this.authnToken = securityContext.getAuthenticationToken();
+ }
}
public void sendHello(Channel channel) {
@@ -227,9 +233,22 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
private void handleResponse(Channel chan, CallResponse callResponse)
throws IOException {
- // TODO(todd): this needs to handle error responses, not just success responses.
- RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
+ final RpcHeader.ResponseHeader header = callResponse.getHeader();
+ if (header.getIsError()) {
+ final RpcHeader.ErrorStatusPB.Builder errBuilder = RpcHeader.ErrorStatusPB.newBuilder();
+ KuduRpc.readProtobuf(callResponse.getPBMessage(), errBuilder);
+ final RpcHeader.ErrorStatusPB error = errBuilder.build();
+ LOG.debug("peer {} sent connection negotiation error: {}",
+ chan.getRemoteAddress(), error.getMessage());
+
+ // The upstream code should handle the negotiation failure.
+ state = State.FINISHED;
+ chan.getPipeline().remove(this);
+ Channels.fireMessageReceived(chan, new Failure(error));
+ return;
+ }
+ RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
// TODO: check that the message type matches the expected one in all
// of the below implementations.
switch (state) {
@@ -490,7 +509,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
// TODO(danburkert): is this a correct assumption? would the
// client ever be "done" but also produce handshake data?
// if it did, would we want to encrypt the SSL message or no?
- assert data.size() == 0;
+ assert data.isEmpty();
return false;
} else {
assert data.size() > 0;
@@ -621,7 +640,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
chan.getPipeline().remove(this);
Channels.write(chan, makeConnectionContext());
- Channels.fireMessageReceived(chan, new Result(serverFeatures));
+ Channels.fireMessageReceived(chan, new Success(serverFeatures));
}
private RpcOutboundMessage makeConnectionContext() throws SaslException {
@@ -686,11 +705,24 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
* The results of a successful negotiation. This is sent to upstream handlers in the
* Netty pipeline after negotiation completes.
*/
- static class Result {
+ static class Success {
final Set<RpcFeatureFlag> serverFeatures;
- public Result(Set<RpcFeatureFlag> serverFeatures) {
+ public Success(Set<RpcFeatureFlag> serverFeatures) {
this.serverFeatures = serverFeatures;
}
}
+
+ /**
+ * The results of a failed negotiation. This is sent to upstream handlers in the Netty pipeline
+ * when a negotiation fails.
+ */
+ static class Failure {
+ /** The RPC error received from the server. */
+ final RpcHeader.ErrorStatusPB status;
+
+ public Failure(RpcHeader.ErrorStatusPB status) {
+ this.status = status;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 3f8de9b..42379ee 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -27,6 +27,7 @@
package org.apache.kudu.client;
import java.util.Set;
+import javax.annotation.Nonnull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -56,14 +57,16 @@ import org.apache.kudu.util.Pair;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class RpcProxy {
+class RpcProxy {
private static final Logger LOG = LoggerFactory.getLogger(RpcProxy.class);
/** The reference to the top-level Kudu client object. */
+ @Nonnull
private final AsyncKuduClient client;
/** The reference to the object representing connection to the target server. */
+ @Nonnull
private final Connection connection;
/**
@@ -73,10 +76,8 @@ public class RpcProxy {
* @param connection the connection associated with the target Kudu server
*/
RpcProxy(AsyncKuduClient client, Connection connection) {
- Preconditions.checkNotNull(client);
- Preconditions.checkNotNull(connection);
- this.client = client;
- this.connection = connection;
+ this.client = Preconditions.checkNotNull(client);
+ this.connection = Preconditions.checkNotNull(connection);
}
/**
@@ -138,7 +139,7 @@ public class RpcProxy {
});
} catch (RecoverableException e) {
// This is to handle RecoverableException(Status.IllegalState()) from
- // Connection.enqueueMessage() if the connection turned into the DISCONNECTED state.
+ // Connection.enqueueMessage() if the connection turned into the TERMINATED state.
client.handleRetryableError(rpc, e);
} catch (Exception e) {
rpc.errback(e);
@@ -190,8 +191,6 @@ public class RpcProxy {
final KuduRpc<R> rpc,
CallResponse response,
KuduException ex) {
- Preconditions.checkNotNull(rpc);
-
final long start = System.nanoTime();
if (LOG.isTraceEnabled()) {
if (response == null) {
@@ -199,7 +198,6 @@ public class RpcProxy {
connection.getLogPrefix(), rpc);
} else {
RpcHeader.ResponseHeader header = response.getHeader();
- Preconditions.checkNotNull(header);
LOG.trace("{} received response with rpcId {}, size {} for RPC {}",
connection.getLogPrefix(), header.getCallId(),
response.getTotalResponseSize(), rpc);
@@ -210,6 +208,10 @@ public class RpcProxy {
rpc.method(), RpcTraceFrame.Action.RECEIVE_FROM_SERVER).serverInfo(
connection.getServerInfo());
if (ex != null) {
+ if (ex instanceof InvalidAuthnTokenException) {
+ client.handleInvalidToken(rpc);
+ return;
+ }
if (ex instanceof RecoverableException) {
// This check is specifically for the ERROR_SERVER_TOO_BUSY, ERROR_UNAVAILABLE and alike.
failOrRetryRpc(client, connection, rpc, (RecoverableException) ex);
@@ -280,8 +282,8 @@ public class RpcProxy {
connection.getLogPrefix(), e, header.getCallId(), rpc);
}
if (LOG.isTraceEnabled()) {
- LOG.trace("------------------<< LEAVING DECODE <<------------------" +
- " time elapsed: " + ((System.nanoTime() - start) / 1000) + "us");
+ LOG.trace("------------------<< LEAVING DECODE <<------------------ time elapsed: {} us",
+ ((System.nanoTime() - start) / 1000));
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
index 8ec226b..15cf40e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
@@ -52,6 +52,12 @@ class RpcTraceFrame {
sb.append(trace.getStatus());
}
},
+ // Waiting for a new authn token to re-send the request.
+ GET_NEW_AUTHENTICATION_TOKEN_THEN_RETRY {
+ void appendToStringBuilder(RpcTraceFrame trace, StringBuilder sb) {
+ sb.append("waiting for new authn token");
+ }
+ },
// After having figured out that we don't know where the RPC is going,
// before querying the master.
QUERY_MASTER {
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
index ac75881..0340bfd 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
@@ -51,6 +51,7 @@ import org.apache.kudu.security.Token.TokenPB;
*/
class SecurityContext {
@GuardedBy("this")
+ @Nullable
private SignedTokenPB authnToken;
private final DelegatedTrustManager trustManager = new DelegatedTrustManager();
@@ -76,7 +77,12 @@ class SecurityContext {
*/
private List<ByteString> trustedCertDers = Collections.emptyList();
- public SecurityContext(Subject subject) {
+ /**
+ * Construct SecurityContext object with the specified JAAS subject.
+ *
+ * @param subject JAAS Subject that the client's credentials are stored in
+ */
+ SecurityContext(Subject subject) {
try {
this.subject = subject;
@@ -85,16 +91,17 @@ class SecurityContext {
this.sslContextTrustAny = SSLContext.getInstance("TLS");
sslContextTrustAny.init(null, new TrustManager[] { new TrustAnyCert() }, null);
-
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ @Nullable
public Subject getSubject() {
return subject;
}
+ @Nullable
public synchronized byte[] exportAuthenticationCredentials() {
if (authnToken == null || !hasTrustedCerts()) {
return null;
@@ -148,6 +155,7 @@ class SecurityContext {
/**
* @return the current authentication token, or null if we have no valid token
*/
+ @Nullable
public synchronized SignedTokenPB getAuthenticationToken() {
return authnToken;
}
@@ -260,7 +268,7 @@ class SecurityContext {
* can be swapped out atomically.
*/
private static class DelegatedTrustManager implements X509TrustManager {
- AtomicReference<X509TrustManager> delegate = new AtomicReference<>();
+ final AtomicReference<X509TrustManager> delegate = new AtomicReference<>();
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType)
@@ -279,4 +287,5 @@ class SecurityContext {
return delegate.get().getAcceptedIssuers();
}
}
+
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index 5e33203..bd1c0e9 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -102,7 +102,7 @@ public class TestAsyncKuduClient extends BaseKuduTest {
boolean sleep = false;
if (!client.getConnectionListCopy().isEmpty()) {
for (Connection c : client.getConnectionListCopy()) {
- if (!c.isDisconnected()) {
+ if (!c.isTerminated()) {
sleep = true;
break;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java
new file mode 100644
index 0000000..cfe9b5a
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+package org.apache.kudu.client;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This test contains scenarios to verify that client re-acquires authn token upon expiration
+ * of the current one and automatically retries the call.
+ */
+public class TestAuthnTokenReacquire extends BaseKuduTest {
+
+ private static final String TABLE_NAME = "TestAuthnTokenReacquire-table";
+ private static final int TOKEN_TTL_SEC = 1;
+ private static final int OP_TIMEOUT_MS = 60 * TOKEN_TTL_SEC * 1000;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Inject additional INVALID_AUTHENTICATION_TOKEN responses from both the master and tablet
+ // servers, even for not-yet-expired tokens.
+ miniClusterBuilder
+ .enableKerberos()
+ .addMasterFlag(String.format("--authn_token_validity_seconds=%d", TOKEN_TTL_SEC))
+ .addMasterFlag("--rpc_inject_invalid_authn_token_ratio=0.5")
+ .addTserverFlag("--rpc_inject_invalid_authn_token_ratio=0.5");
+
+ BaseKuduTest.setUpBeforeClass();
+ }
+
+ private static void dropConnections() {
+ for (Connection c : client.getConnectionListCopy()) {
+ c.disconnect();
+ }
+ }
+
+ private static void dropConnectionsAndExpireToken() throws InterruptedException {
+ // Drop all connections from the client to Kudu servers.
+ dropConnections();
+ // Wait for authn token expiration.
+ Thread.sleep(TOKEN_TTL_SEC * 1000);
+ }
+
+ @Test
+ public void testBasicMasterOperations() throws Exception {
+ // To ratchet up the intensity a bit, run the scenario by several concurrent threads.
+ List<Thread> threads = new ArrayList<>();
+ final Map<Integer, Throwable> exceptions =
+ Collections.synchronizedMap(new HashMap<Integer, Throwable>());
+ for (int i = 0; i < 8; ++i) {
+ final int threadIdx = i;
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ final String tableName = "TestAuthnTokenReacquire-table-" + threadIdx;
+ try {
+ ListTabletServersResponse response = syncClient.listTabletServers();
+ assertNotNull(response);
+ dropConnectionsAndExpireToken();
+
+ ListTablesResponse tableList = syncClient.getTablesList(tableName);
+ assertNotNull(tableList);
+ assertTrue(tableList.getTablesList().isEmpty());
+ dropConnectionsAndExpireToken();
+
+ syncClient.createTable(tableName, basicSchema, getBasicCreateTableOptions());
+ dropConnectionsAndExpireToken();
+
+ KuduTable table = syncClient.openTable(tableName);
+ assertEquals(basicSchema.getColumnCount(), table.getSchema().getColumnCount());
+ dropConnectionsAndExpireToken();
+
+ syncClient.deleteTable(tableName);
+ assertFalse(syncClient.tableExists(tableName));
+ } catch (Throwable e) {
+ //noinspection ThrowableResultOfMethodCallIgnored
+ exceptions.put(threadIdx, e);
+ }
+ }
+ });
+ thread.run();
+ threads.add(thread);
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ if (!exceptions.isEmpty()) {
+ for (Map.Entry<Integer, Throwable> e : exceptions.entrySet()) {
+ LOG.error("exception in thread {}: {}", e.getKey(), e.getValue());
+ }
+ fail("test failed: unexpected errors");
+ }
+ }
+
+ @Test
+ public void testBasicWorkflow() throws Exception {
+ KuduTable table = syncClient.createTable(TABLE_NAME, basicSchema,
+ getBasicCreateTableOptions());
+ dropConnectionsAndExpireToken();
+
+ KuduSession session = syncClient.newSession();
+ session.setTimeoutMillis(OP_TIMEOUT_MS);
+ session.apply(createBasicSchemaInsert(table, 1));
+ session.flush();
+ RowErrorsAndOverflowStatus errors = session.getPendingErrors();
+ assertFalse(errors.isOverflowed());
+ assertEquals(0, session.countPendingErrors());
+ dropConnectionsAndExpireToken();
+
+ KuduTable scanTable = syncClient.openTable(TABLE_NAME);
+ AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(client, scanTable)
+ .scanRequestTimeout(OP_TIMEOUT_MS)
+ .build();
+ assertEquals(1, countRowsInScan(scanner));
+ dropConnectionsAndExpireToken();
+
+ syncClient.deleteTable(TABLE_NAME);
+ assertFalse(syncClient.tableExists(TABLE_NAME));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
new file mode 100644
index 0000000..d20d534
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+package org.apache.kudu.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This test contains a special scenario to make sure the automatic authn token re-acquisition works
+ * in the case when the client has established a connection to the master using secondary
+ * credentials. The subtlety is that an authn token cannot be acquired using such a connection,
+ * so this test verifies that the client opens a new connection using its primary credentials to
+ * acquire a new authentication token and automatically retries its RPCs with the new authn token.
+ */
+public class TestAuthnTokenReacquireOpen extends BaseKuduTest {
+
+ private static final String TABLE_NAME = "TestAuthnTokenReacquireOpen-table";
+ private static final int TOKEN_TTL_SEC = 1;
+ private static final int OP_TIMEOUT_MS = 60 * TOKEN_TTL_SEC * 1000;
+ private static final int KEEPALIVE_TIME_MS = 2 * OP_TIMEOUT_MS;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Set appropriate TTL for authn token and connection keep-alive property, so the client could
+ // keep an open connection to the master when its authn token is already expired. Inject
+ // additional INVALID_AUTHENTICATION_TOKEN responses from the tablet server even for
+ // not-yet-expired tokens for an extra stress on the client.
+ miniClusterBuilder
+ .enableKerberos()
+ .addMasterFlag(String.format("--authn_token_validity_seconds=%d", TOKEN_TTL_SEC))
+ .addMasterFlag(String.format("--rpc_default_keepalive_time_ms=%d", KEEPALIVE_TIME_MS))
+ .addTserverFlag(String.format("--rpc_default_keepalive_time_ms=%d", KEEPALIVE_TIME_MS))
+ .addTserverFlag("--rpc_inject_invalid_authn_token_ratio=0.5");
+
+ // We want to have a cluster with a single master.
+ final int NUM_MASTERS = 1;
+ final int NUM_TABLET_SERVERS = 3;
+ doSetup(NUM_MASTERS, NUM_TABLET_SERVERS);
+ }
+
+ private static void dropConnections() {
+ for (Connection c : client.getConnectionListCopy()) {
+ c.disconnect();
+ }
+ }
+
+ private static void expireToken() throws InterruptedException {
+ // Wait for authn token expiration.
+ Thread.sleep(TOKEN_TTL_SEC * 1000);
+ }
+
+ @Test
+ public void test() throws Exception {
+ // Establish a connection to the cluster, get the list of tablet servers. That would fetch
+ // an authn token.
+ ListTabletServersResponse response = syncClient.listTabletServers();
+ assertNotNull(response);
+ dropConnections();
+
+ // The connection to the master has been dropped. Make a call to the master again so the client
+ // would create a new connection using authn token.
+ ListTablesResponse tableList = syncClient.getTablesList(null);
+ assertNotNull(tableList);
+ assertTrue(tableList.getTablesList().isEmpty());
+
+ syncClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
+ assertTrue(syncClient.tableExists(TABLE_NAME));
+
+ expireToken();
+
+ // Try scan table rows once the authn token has expired. This request goes to corresponding
+ // tablet server, and a new connection should be negotiated. During connection negotiation,
+ // the server authenticates the client using authn token, which is expired.
+ KuduTable scanTable = syncClient.openTable(TABLE_NAME);
+ AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(client, scanTable)
+ .scanRequestTimeout(OP_TIMEOUT_MS)
+ .build();
+ assertEquals(0, countRowsInScan(scanner));
+
+ syncClient.deleteTable(TABLE_NAME);
+ assertFalse(syncClient.tableExists(TABLE_NAME));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index 2404005..5b5d4cf 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -64,18 +64,18 @@ public class TestConnectionCache {
// 1 tserver and 3 masters and 3 connections from the newRpcProxy() in the loop above.
assertEquals(1 + 3 + 3, client.getConnectionListCopy().size());
- assertFalse(allConnectionsDisconnected(client));
+ assertFalse(allConnectionsTerminated(client));
final RpcProxy proxy = client.newRpcProxy(serverInfos.get(0));
// Disconnect from the server.
proxy.getConnection().disconnect().awaitUninterruptibly();
- waitForConnectionToClose(proxy.getConnection());
- assertTrue(proxy.getConnection().isDisconnected());
+ waitForConnectionToTerminate(proxy.getConnection());
+ assertTrue(proxy.getConnection().isTerminated());
// Make sure not all the connections in the connection cache are disconnected yet. Actually,
// only the connection to server '0' should be disconnected.
- assertFalse(allConnectionsDisconnected(client));
+ assertFalse(allConnectionsTerminated(client));
// For a new RpcProxy instance, a new connection to the same destination is established.
final RpcProxy newHelper = client.newRpcProxy(serverInfos.get(0));
@@ -96,9 +96,9 @@ public class TestConnectionCache {
// Test disconnecting and make sure we cleaned up all the connections.
for (Connection c : client.getConnectionListCopy()) {
c.disconnect().awaitUninterruptibly();
- waitForConnectionToClose(c);
+ waitForConnectionToTerminate(c);
}
- assertTrue(allConnectionsDisconnected(client));
+ assertTrue(allConnectionsTerminated(client));
} finally {
if (cluster != null) {
cluster.shutdown();
@@ -106,19 +106,19 @@ public class TestConnectionCache {
}
}
- private boolean allConnectionsDisconnected(AsyncKuduClient client) {
+ private boolean allConnectionsTerminated(AsyncKuduClient client) {
for (Connection c : client.getConnectionListCopy()) {
- if (!c.isDisconnected()) {
+ if (!c.isTerminated()) {
return false;
}
}
return true;
}
- private void waitForConnectionToClose(Connection c) throws InterruptedException {
+ private void waitForConnectionToTerminate(Connection c) throws InterruptedException {
DeadlineTracker deadlineTracker = new DeadlineTracker();
deadlineTracker.setDeadline(5000);
- while (!c.isDisconnected() && !deadlineTracker.timedOut()) {
+ while (!c.isTerminated() && !deadlineTracker.timedOut()) {
Thread.sleep(250);
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index ac3e52f..d10f7b0 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -48,7 +48,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kudu.client.Negotiator.Result;
+import org.apache.kudu.client.Negotiator.Success;
import org.apache.kudu.rpc.RpcHeader.AuthenticationTypePB;
import org.apache.kudu.rpc.RpcHeader.ConnectionContextPB;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
@@ -102,7 +102,7 @@ public class TestNegotiator {
}
private void startNegotiation(boolean fakeLoopback) {
- Negotiator negotiator = new Negotiator("127.0.0.1", secContext);
+ Negotiator negotiator = new Negotiator("127.0.0.1", secContext, false);
negotiator.overrideLoopbackForTests = fakeLoopback;
embedder = new DecoderEmbedder<Object>(negotiator);
negotiator.sendHello(embedder.getPipeline().getChannel());
@@ -135,19 +135,19 @@ public class TestNegotiator {
/**
* Checks that the client sends a connection context and then yields
- * a Negotiation.Result to the pipeline.
+ * a Negotiation.Success to the pipeline.
* @return the result
*/
- private Result assertComplete() {
+ private Success assertComplete() {
RpcOutboundMessage msg = (RpcOutboundMessage)embedder.poll();
ConnectionContextPB connCtx = (ConnectionContextPB)msg.getBody();
assertEquals(Negotiator.CONNECTION_CTX_CALL_ID, msg.getHeaderBuilder().getCallId());
assertEquals(System.getProperty("user.name"), connCtx.getDEPRECATEDUserInfo().getRealUser());
- // Expect the client to also emit a negotiation Result.
- Result result = (Result)embedder.poll();
- assertNotNull(result);
- return result;
+ // Expect the client to also emit a negotiation Success.
+ Success success = (Success)embedder.poll();
+ assertNotNull(success);
+ return success;
}
@Test