You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/07/12 21:50:03 UTC

kudu git commit: [java] KUDU-2013 re-acquire authn token if expired

Repository: kudu
Updated Branches:
  refs/heads/master 81939782a -> 603c1578c


[java] KUDU-2013 re-acquire authn token if expired

This patch introduces automatic authn token re-acquisition when the
current authn token expires.  The client automatically retries the RPC
that hits the token expiration error (the error to re-try is seen as
FATAL_INVALID_AUTHENTICATION_TOKEN sent by the server during connection
negotiation).

Added a few of tests to exercise the new retry logic for automatic
token re-acquisition in case of master-only operations, a bare minimum
workload scenario, and one special case of a connection to the master
opened with secondary credentials.

Change-Id: I0be620629c9a8345ecd5e5679c80ee76ca4eaa57
Reviewed-on: http://gerrit.cloudera.org:8080/7250
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/603c1578
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/603c1578
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/603c1578

Branch: refs/heads/master
Commit: 603c1578c78c0377ffafdd9c427ebfd8a206bda3
Parents: 8193978
Author: Alexey Serbin <as...@cloudera.com>
Authored: Tue Jun 20 15:25:42 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Jul 12 21:49:15 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java | 135 +++++++++++---
 .../kudu/client/AuthnTokenReacquirer.java       | 178 +++++++++++++++++++
 .../apache/kudu/client/ConnectToCluster.java    |   7 +-
 .../java/org/apache/kudu/client/Connection.java | 177 +++++++++++++-----
 .../org/apache/kudu/client/ConnectionCache.java | 111 +++++++-----
 .../kudu/client/InvalidAuthnTokenException.java |  39 ++++
 .../java/org/apache/kudu/client/Negotiator.java |  52 ++++--
 .../java/org/apache/kudu/client/RpcProxy.java   |  24 +--
 .../org/apache/kudu/client/RpcTraceFrame.java   |   6 +
 .../org/apache/kudu/client/SecurityContext.java |  15 +-
 .../apache/kudu/client/TestAsyncKuduClient.java |   2 +-
 .../kudu/client/TestAuthnTokenReacquire.java    | 143 +++++++++++++++
 .../client/TestAuthnTokenReacquireOpen.java     |  99 +++++++++++
 .../apache/kudu/client/TestConnectionCache.java |  20 +--
 .../org/apache/kudu/client/TestNegotiator.java  |  16 +-
 15 files changed, 872 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index a304d05..aec75e7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -204,11 +204,13 @@ public class AsyncKuduClient implements AutoCloseable {
 
   private final SecurityContext securityContext;
 
+  /** A helper to facilitate re-acquiring of authentication token if current one expires. */
+  private final AuthnTokenReacquirer tokenReacquirer;
+
   private volatile boolean closed;
 
   private AsyncKuduClient(AsyncKuduClientBuilder b) {
     this.channelFactory = b.createChannelFactory();
-    this.securityContext = new SecurityContext(b.subject);
     this.masterAddresses = b.masterAddresses;
     this.masterTable = new KuduTable(this, MASTER_TABLE_NAME_PLACEHOLDER,
         MASTER_TABLE_NAME_PLACEHOLDER, null, null);
@@ -216,34 +218,57 @@ public class AsyncKuduClient implements AutoCloseable {
     this.defaultAdminOperationTimeoutMs = b.defaultAdminOperationTimeoutMs;
     this.defaultSocketReadTimeoutMs = b.defaultSocketReadTimeoutMs;
     this.statisticsDisabled = b.statisticsDisabled;
-    statistics = statisticsDisabled ? null : new Statistics();
+    this.statistics = statisticsDisabled ? null : new Statistics();
     this.timer = b.timer;
-    String clientId = UUID.randomUUID().toString().replace("-", "");
-    this.requestTracker = new RequestTracker(clientId);
+    this.requestTracker = new RequestTracker(UUID.randomUUID().toString().replace("-", ""));
+
+    this.securityContext = new SecurityContext(b.subject);
     this.connectionCache = new ConnectionCache(
         securityContext, defaultSocketReadTimeoutMs, timer, channelFactory);
+    this.tokenReacquirer = new AuthnTokenReacquirer(this);
   }
 
   /**
-   * Get a proxy to send RPC calls to the specified server.
+   * Get a proxy to send RPC calls to the specified server. The result proxy object does not
+   * restrict the type of credentials that may be used to connect to the server: it will use the
+   * secondary credentials if available, otherwise SASL credentials are used to authenticate
+   * the client when negotiating the connection to the server.
    *
    * @param serverInfo server's information
    * @return the proxy object bound to the target server
    */
   @Nonnull
   RpcProxy newRpcProxy(final ServerInfo serverInfo) {
-    Preconditions.checkNotNull(serverInfo);
-    return new RpcProxy(this, connectionCache.getConnection(serverInfo));
+    return newRpcProxy(serverInfo, Connection.CredentialsPolicy.ANY_CREDENTIALS);
+  }
+
+  /**
+   * Get a proxy to send RPC calls to the specified server. The result proxy object should use
+   * a connection to the server negotiated with the specified credentials policy.
+   *
+   * @param serverInfo target server information
+   * @param credentialsPolicy authentication credentials policy to use for the connection
+   *                          negotiation
+   * @return the proxy object bound to the target server
+   */
+  @Nonnull
+  private RpcProxy newRpcProxy(final ServerInfo serverInfo,
+                               Connection.CredentialsPolicy credentialsPolicy) {
+    final Connection connection = connectionCache.getConnection(serverInfo, credentialsPolicy);
+    return new RpcProxy(this, connection);
   }
 
   /**
    * Get a proxy to send RPC calls to Kudu master at the specified end-point.
    *
    * @param hostPort master end-point
+   * @param credentialsPolicy credentials policy to use for the connection negotiation to the target
+   *                          master server
    * @return the proxy object bound to the target master
    */
   @Nullable
-  RpcProxy newMasterRpcProxy(HostAndPort hostPort) {
+  RpcProxy newMasterRpcProxy(HostAndPort hostPort,
+                             Connection.CredentialsPolicy credentialsPolicy) {
     // We should have a UUID to construct ServerInfo for the master, but we have a chicken
     // and egg problem, we first need to communicate with the masters to find out about them,
     // and that's what we're trying to do. The UUID is just used for logging and cache key,
@@ -253,7 +278,43 @@ public class AsyncKuduClient implements AutoCloseable {
       // TODO(todd): should we log the resolution failure? throw an exception?
       return null;
     }
-    return newRpcProxy(new ServerInfo("master-" + hostPort.toString(), hostPort, inetAddress));
+    return newRpcProxy(
+        new ServerInfo("master-" + hostPort.toString(), hostPort, inetAddress), credentialsPolicy);
+  }
+
+  void reconnectToCluster(Callback<Void, Boolean> cb,
+                          Callback<Void, Exception> eb) {
+
+    final class ReconnectToClusterCB implements Callback<Void, ConnectToClusterResponse> {
+      private final Callback<Void, Boolean> cb;
+
+      ReconnectToClusterCB(Callback<Void, Boolean> cb) {
+        this.cb = Preconditions.checkNotNull(cb);
+      }
+
+      /**
+       * Report on the token re-acqusition results. The result authn token might be null: in that
+       * case the SASL credentials will be used to negotiate future connections.
+       */
+      @Override
+      public Void call(ConnectToClusterResponse resp) throws Exception {
+        final Master.ConnectToMasterResponsePB masterResponsePB = resp.getConnectResponse();
+        if (masterResponsePB.hasAuthnToken()) {
+          LOG.info("connect to master: received a new authn token");
+          securityContext.setAuthenticationToken(masterResponsePB.getAuthnToken());
+          cb.call(true);
+        } else {
+          LOG.warn("connect to master: received no authn token");
+          securityContext.setAuthenticationToken(null);
+          cb.call(false);
+        }
+        return null;
+      }
+    }
+
+    ConnectToCluster.run(masterTable, masterAddresses, null, defaultAdminOperationTimeoutMs,
+        Connection.CredentialsPolicy.PRIMARY_CREDENTIALS).addCallbacks(
+            new ReconnectToClusterCB(cb), eb);
   }
 
   /**
@@ -719,8 +780,7 @@ public class AsyncKuduClient implements AutoCloseable {
    * @return A deferred row.
    */
   Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
-    RemoteTablet tablet = scanner.currentTablet();
-    Preconditions.checkNotNull(tablet);
+    RemoteTablet tablet = Preconditions.checkNotNull(scanner.currentTablet());
     KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
     // Important to increment the attempts before the next if statement since
     // getSleepTimeForRpc() relies on it if the client is null or dead.
@@ -733,7 +793,8 @@ public class AsyncKuduClient implements AutoCloseable {
     }
 
     Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
-    RpcProxy.sendRpc(this, connectionCache.getConnection(info), nextRequest);
+    RpcProxy.sendRpc(this, connectionCache.getConnection(
+        info, Connection.CredentialsPolicy.ANY_CREDENTIALS), nextRequest);
     return d;
   }
 
@@ -757,7 +818,8 @@ public class AsyncKuduClient implements AutoCloseable {
 
     final Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred();
     closeRequest.attempt++;
-    RpcProxy.sendRpc(this, connectionCache.getConnection(info), closeRequest);
+    RpcProxy.sendRpc(this, connectionCache.getConnection(
+        info, Connection.CredentialsPolicy.ANY_CREDENTIALS), closeRequest);
     return d;
   }
 
@@ -806,7 +868,8 @@ public class AsyncKuduClient implements AutoCloseable {
       if (info != null) {
         Deferred<R> d = request.getDeferred();
         request.setTablet(tablet);
-        RpcProxy.sendRpc(this, connectionCache.getConnection(info), request);
+        RpcProxy.sendRpc(this, connectionCache.getConnection(
+            info, Connection.CredentialsPolicy.ANY_CREDENTIALS), request);
         return d;
       }
     }
@@ -1066,15 +1129,14 @@ public class AsyncKuduClient implements AutoCloseable {
                                                           final KuduException cause) {
     String message;
     if (request.attempt > MAX_RPC_ATTEMPTS) {
-      message = "Too many attempts: ";
+      message = "too many attempts: ";
     } else {
-      message = "RPC can not complete before timeout: ";
+      message = "can not complete before timeout: ";
     }
     Status statusTimedOut = Status.TimedOut(message + request);
-    final Exception e = new NonRecoverableException(statusTimedOut, cause);
-    LOG.debug("Cannot continue with this RPC: {} because of: {}", request, message, e);
+    LOG.debug("Cannot continue with RPC because of: {}", statusTimedOut);
     Deferred<R> d = request.getDeferred();
-    request.errback(e);
+    request.errback(new NonRecoverableException(statusTimedOut, cause));
     return d;
   }
 
@@ -1136,14 +1198,13 @@ public class AsyncKuduClient implements AutoCloseable {
   Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
     // TODO(todd): stop using this 'masterTable' hack.
     return ConnectToCluster.run(masterTable, masterAddresses, parentRpc,
-        defaultAdminOperationTimeoutMs).addCallback(
+        defaultAdminOperationTimeoutMs, Connection.CredentialsPolicy.ANY_CREDENTIALS).addCallback(
             new Callback<Master.GetTableLocationsResponsePB, ConnectToClusterResponse>() {
               @Override
               public Master.GetTableLocationsResponsePB call(ConnectToClusterResponse resp) {
-                // If the response has security info, adopt it.
                 if (resp.getConnectResponse().hasAuthnToken()) {
-                  securityContext.setAuthenticationToken(
-                      resp.getConnectResponse().getAuthnToken());
+                  // If the response has security info, adopt it.
+                  securityContext.setAuthenticationToken(resp.getConnectResponse().getAuthnToken());
                 }
                 List<ByteString> caCerts = resp.getConnectResponse().getCaCertDerList();
                 if (!caCerts.isEmpty()) {
@@ -1298,6 +1359,32 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * Same as {@link #handleRetryableError(KuduRpc, KuduException)}, but without the delay before
+   * retrying the RPC.
+   *
+   * @param rpc the RPC to retry
+   * @param ex the exception which lead to the attempt of RPC retry
+   */
+  <R> void handleRetryableErrorNoDelay(final KuduRpc<R> rpc, KuduException ex) {
+    if (cannotRetryRequest(rpc)) {
+      tooManyAttemptsOrTimeout(rpc, ex);
+      return;
+    }
+    sendRpcToTablet(rpc);
+  }
+
+  /**
+   * Handle a RPC failed due to invalid authn token error. In short, connect to the Kudu cluster
+   * to acquire a new authentication token and retry the RPC once a new authentication token
+   * is put into the {@link #securityContext}.
+   *
+   * @param rpc the RPC which failed do to invalid authn token
+   */
+  <R> void handleInvalidToken(KuduRpc<R> rpc) {
+    tokenReacquirer.handleAuthnTokenExpiration(rpc);
+  }
+
+  /**
    * This methods enable putting RPCs on hold for a period of time determined by
    * {@link #getSleepTimeForRpc(KuduRpc)}. If the RPC is out of time/retries, its errback will
    * be immediately called.
@@ -1560,7 +1647,7 @@ public class AsyncKuduClient implements AutoCloseable {
           public Deferred<LocatedTablet> call(List<LocatedTablet> tablets) {
             Preconditions.checkArgument(tablets.size() <= 1,
                                         "found more than one tablet for a single partition key");
-            if (tablets.size() == 0) {
+            if (tablets.isEmpty()) {
               // Most likely this indicates a non-covered range, but since this
               // could race with an alter table partitioning operation (which
               // clears the local table locations cache), we check again.

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java
new file mode 100644
index 0000000..8f4811b
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AuthnTokenReacquirer.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.stumbleupon.async.Callback;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * An utility class to reacquire authentication token when the current one expires.
+ */
+@InterfaceAudience.Private
+final class AuthnTokenReacquirer {
+  /** The Kudu client object the AuthnTokenReacquirer is bound to. */
+  private final AsyncKuduClient client;
+
+  /** A dedicated synchronization object for #queuedRpcs */
+  private final Object queuedRpcsLock = new Object();
+
+  /**
+   * Container to store information on RPCs affected by authn token expiration error. The RPCs
+   * will be retried on successful token re-acquisition attempt or their errback() method
+   * will be called if authn token re-acquisition fails.
+   */
+  @GuardedBy("queuedRpcsLock")
+  private ArrayList<KuduRpc<?>> queuedRpcs = Lists.newArrayList();
+
+  /**
+   * Create a new AuthnTokenReacquirer object.
+   *
+   * @param client the Kudu client object
+   */
+  AuthnTokenReacquirer(AsyncKuduClient client) {
+    this.client = client;
+  }
+
+  /**
+   * Add information on the RPC which failed due to expired authentication token and requires a new
+   * authn token to retry. Calling this method triggers authn token re-acquisition if there is not
+   * active one yet.
+   *
+   * @param rpc the RPC which failed due to the expired authn token error
+   */
+  <R> void handleAuthnTokenExpiration(KuduRpc<R> rpc) {
+    boolean doReacquire = false;
+    synchronized (queuedRpcsLock) {
+      if (queuedRpcs.isEmpty()) {
+        // Using non-emptiness of the container as a state here. If the container is empty, that
+        // means a new re-acquisition round should be started.  If the container is not empty,
+        // that means the process of token re-acquisition has already been already and the
+        // elements of the #queuedRpcs container should be processed once a new token
+        // re-acquisition completes (it could succeed or fail).
+        //
+        // TODO(aserbin): introduce a timestamp for the recently acquired authn token, so it would
+        //   not try to re-acquire a token too often if there is a race between clearing
+        //   the container after token acquisition is completed and scheduling token re-acquisition.
+        doReacquire = true;
+      }
+      queuedRpcs.add(rpc);
+    }
+    rpc.addTrace(new RpcTraceFrame.RpcTraceFrameBuilder(
+        rpc.method(), RpcTraceFrame.Action.GET_NEW_AUTHENTICATION_TOKEN_THEN_RETRY)
+        .build());
+
+    if (doReacquire) {
+      reacquireAuthnToken();
+    }
+  }
+
+  private List<KuduRpc<?>> swapQueuedRpcs() {
+    List<KuduRpc<?>> rpcList;
+    synchronized (queuedRpcsLock) {
+      rpcList = queuedRpcs;
+      queuedRpcs = Lists.newArrayList();
+    }
+    assert !rpcList.isEmpty();
+    return rpcList;
+  }
+
+  private void reacquireAuthnToken() {
+
+    /**
+     * An utility class providing callbacks for successful completion of authn token re-acqusition.
+     */
+    final class NewAuthnTokenCB implements Callback<Void, Boolean> {
+      /**
+       * Callback upon 'successful' completion of an attempt to acquire a new token,
+       * i.e. an attempt where no exception detected in the code path.
+       *
+       * @param tokenAcquired {@code true} if a new token acquired, {@code false} if
+       *                      the ConnectToCluster yielded no authn token.
+       */
+      @Override
+      public Void call(Boolean tokenAcquired) throws Exception {
+        // TODO(aserbin): do we need to handle a successful re-connect with no token some other way?
+        retryQueuedRpcs();
+        return null;
+      }
+
+      /**
+       * Handle the affected RPCs on the completion of authn token re-acquisition. The result authn
+       * token might be null, so in that case primary credentials will be used for future
+       * connection negotiations.
+       */
+      void retryQueuedRpcs() {
+        List<KuduRpc<?>> list = swapQueuedRpcs();
+        for (KuduRpc<?> rpc : list) {
+          client.handleRetryableErrorNoDelay(rpc, null);
+        }
+      }
+    }
+
+    /**
+     * Errback to retry authn token re-acquisition and notify the handle the affected RPCs if the
+     * re-acquisition failed after some number of retries (currently, it's 5 attempts).
+     *
+     * TODO(aserbin): perhaps we should retry indefinitely with increasing backoff, but aggressively
+     *                timeout RPCs in the queue after each failure.
+     */
+    final class NewAuthnTokenErrB implements Callback<Void, Exception> {
+      private static final int MAX_ATTEMPTS = 5;
+      private final NewAuthnTokenCB cb;
+      private int attempts = 0;
+
+      NewAuthnTokenErrB(NewAuthnTokenCB cb) {
+        this.cb = cb;
+      }
+
+      @Override
+      public Void call(Exception e) {
+        if (e instanceof RecoverableException && attempts < MAX_ATTEMPTS) {
+          client.reconnectToCluster(cb, this);
+          ++attempts;
+          return null;
+        }
+
+        failQueuedRpcs();
+        return null;
+      }
+
+      /** Handle the affected RPCs if authn token re-acquisition fails. */
+      void failQueuedRpcs() {
+        List<KuduRpc<?>> rpcList = swapQueuedRpcs();
+        for (KuduRpc<?> rpc : rpcList) {
+          Exception reason = new NonRecoverableException(Status.NotAuthorized(String.format(
+              "cannot re-acquire authentication token after %d attempts", MAX_ATTEMPTS)));
+          rpc.errback(reason);
+        }
+      }
+    }
+
+    final NewAuthnTokenCB newTokenCb = new NewAuthnTokenCB();
+    final NewAuthnTokenErrB newTokenErrb = new NewAuthnTokenErrB(newTokenCb);
+    client.reconnectToCluster(newTokenCb, newTokenErrb);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index dfba55d..3f9c9e5 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -131,13 +131,15 @@ final class ConnectToCluster {
    * @param masterAddresses the addresses of masters to fetch from
    * @param parentRpc RPC that prompted a master lookup, can be null
    * @param defaultTimeoutMs timeout to use for RPCs if the parentRpc has no timeout
+   * @param credentialsPolicy credentials policy to use for connection negotiation
    * @return a Deferred object for the cluster connection status
    */
   public static Deferred<ConnectToClusterResponse> run(
       KuduTable masterTable,
       List<HostAndPort> masterAddresses,
       KuduRpc<?> parentRpc,
-      long defaultTimeoutMs) {
+      long defaultTimeoutMs,
+      Connection.CredentialsPolicy credentialsPolicy) {
     ConnectToCluster connector = new ConnectToCluster(masterAddresses);
 
     // Try to connect to each master. The ConnectToCluster instance
@@ -145,7 +147,8 @@ final class ConnectToCluster {
     // deferred.
     for (HostAndPort hostAndPort : masterAddresses) {
       Deferred<ConnectToMasterResponsePB> d;
-      RpcProxy proxy = masterTable.getAsyncClient().newMasterRpcProxy(hostAndPort);
+      RpcProxy proxy = masterTable.getAsyncClient().newMasterRpcProxy(
+          hostAndPort, credentialsPolicy);
       if (proxy != null) {
         d = connectToMaster(masterTable, proxy, parentRpc, defaultTimeoutMs);
       } else {

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index eea1b82..5034e5b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -60,7 +60,6 @@ import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kudu.client.Negotiator.Result;
 import org.apache.kudu.rpc.RpcHeader;
 import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
 
@@ -77,23 +76,46 @@ import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
  * Acquiring the monitor on an object of this class will prevent it from
  * accepting write requests as well as buffering requests if the underlying
  * channel isn't connected.
+ *
  * TODO(aserbin) clarify on the socketReadTimeoutMs and using per-RPC timeout settings.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 class Connection extends SimpleChannelUpstreamHandler {
+  /**
+   * Authentication credentials policy for negotiating outbound connections. Some requests
+   * (e.g. {@link ConnectToMasterRequest}) behave differently depending on the type of credentials
+   * used for authentication when negotiating on the underlying connection. If some particular
+   * behavior is required, it's necessary to specify appropriate credentials policy while creating
+   * an instance of this object.
+   */
+  public enum CredentialsPolicy {
+    /** It's acceptable to use authentication credentials of any type, primary or secondary ones. */
+    ANY_CREDENTIALS,
+
+    /**
+     * Only primary credentials are acceptable. Primary credentials are Kerberos tickets,
+     * TLS certificate. Secondary credentials are authentication tokens: they are 'derived'
+     * in the sense that it's possible to acquire them using 'primary' credentials.
+     */
+    PRIMARY_CREDENTIALS,
+  }
+
   /** Information on the target server. */
   private final ServerInfo serverInfo;
 
   /** Security context to use for connection negotiation. */
   private final SecurityContext securityContext;
 
-  /** Read timeout for the connection (used by Netty's ReadTimeoutHandler) */
+  /** Read timeout for the connection (used by Netty's ReadTimeoutHandler). */
   private final long socketReadTimeoutMs;
 
-  /** Timer to monitor read timeouts for the connection (used by Netty's ReadTimeoutHandler) */
+  /** Timer to monitor read timeouts for the connection (used by Netty's ReadTimeoutHandler). */
   private final HashedWheelTimer timer;
 
+  /** Credentials policy to use when authenticating. */
+  private final CredentialsPolicy credentialsPolicy;
+
   /** The underlying Netty's socket channel. */
   private final SocketChannel channel;
 
@@ -118,7 +140,7 @@ class Connection extends SimpleChannelUpstreamHandler {
   /** Lock to guard access to some of the fields below. */
   private final ReentrantLock lock = new ReentrantLock();
 
-  /** A state of this object. */
+  /** The current state of this Connection object. */
   @GuardedBy("lock")
   private State state;
 
@@ -135,24 +157,43 @@ class Connection extends SimpleChannelUpstreamHandler {
   @GuardedBy("lock")
   private ArrayList<QueuedMessage> queuedMessages = Lists.newArrayList();
 
-  /** The result of the connection negotiation. */
+  /** The result of the successful connection negotiation. */
   @GuardedBy("lock")
-  private Result negotiationResult = null;
+  private Negotiator.Success negotiationResult = null;
+
+  /** The result of failed connection negotiation. */
+  @GuardedBy("lock")
+  private Negotiator.Failure negotiationFailure = null;
 
   /** A monotonically increasing counter for RPC IDs. */
   @GuardedBy("lock")
   private int nextCallId = 0;
 
+  /**
+   * Create a new Connection object to the specified destination.
+   *
+   * @param serverInfo the destination server
+   * @param securityContext security context to use for connection negotiation
+   * @param socketReadTimeoutMs timeout for the read operations on the socket
+   * @param timer timer to set up read timeout on the corresponding Netty channel
+   * @param channelFactory Netty factory to create corresponding Netty channel
+   * @param credentialsPolicy policy controlling which credentials to use while negotiating on the
+   *                          connection to the target server:
+   *                          if {@link CredentialsPolicy#PRIMARY_CREDENTIALS}, the authentication
+   *                          token from the security context is ignored
+   */
   Connection(ServerInfo serverInfo,
              SecurityContext securityContext,
              long socketReadTimeoutMs,
              HashedWheelTimer timer,
-             ClientSocketChannelFactory channelFactory) {
+             ClientSocketChannelFactory channelFactory,
+             CredentialsPolicy credentialsPolicy) {
     this.serverInfo = serverInfo;
     this.securityContext = securityContext;
     this.state = State.NEW;
     this.socketReadTimeoutMs = socketReadTimeoutMs;
     this.timer = timer;
+    this.credentialsPolicy = credentialsPolicy;
 
     final ConnectionPipeline pipeline = new ConnectionPipeline();
     pipeline.init();
@@ -179,7 +220,8 @@ class Connection extends SimpleChannelUpstreamHandler {
       lock.unlock();
     }
     Channels.write(channel, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
-    Negotiator negotiator = new Negotiator(serverInfo.getHostname(), securityContext);
+    Negotiator negotiator = new Negotiator(serverInfo.getHostname(), securityContext,
+        (credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS));
     ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator);
     negotiator.sendHello(channel);
   }
@@ -198,7 +240,7 @@ class Connection extends SimpleChannelUpstreamHandler {
   @Override
   public void channelDisconnected(final ChannelHandlerContext ctx,
                                   final ChannelStateEvent e) throws Exception {
-    // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
+    // No need to call super.channelDisconnected(ctx, e) -- there should be nobody in the upstream
     // pipeline after Connection itself. So, just handle the disconnection event ourselves.
     cleanup("connection disconnected");
   }
@@ -216,15 +258,17 @@ class Connection extends SimpleChannelUpstreamHandler {
   @Override
   public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
     Object m = evt.getMessage();
-    if (m instanceof Negotiator.Result) {
+
+    // Process the results of a successful negotiation.
+    if (m instanceof Negotiator.Success) {
       lock.lock();
       try {
         Preconditions.checkState(state == State.NEGOTIATING);
-        Preconditions.checkNotNull(queuedMessages);
+        Preconditions.checkState(inflightMessages.isEmpty());
 
         state = State.READY;
-        negotiationResult = (Negotiator.Result) m;
-        List<QueuedMessage> queued = queuedMessages;
+        negotiationResult = (Negotiator.Success) m;
+        List<QueuedMessage> queued = Preconditions.checkNotNull(queuedMessages);
         // The queuedMessages should not be used anymore once the connection is negotiated.
         queuedMessages = null;
         // Send out all the enqueued messages. This is done while holding the lock to preserve
@@ -239,6 +283,24 @@ class Connection extends SimpleChannelUpstreamHandler {
       return;
     }
 
+    // Process the results of a failed negotiation.
+    if (m instanceof Negotiator.Failure) {
+      lock.lock();
+      try {
+        Preconditions.checkState(state == State.NEGOTIATING);
+        Preconditions.checkState(inflightMessages.isEmpty());
+
+        state = State.NEGOTIATION_FAILED;
+        negotiationFailure = (Negotiator.Failure) m;
+      } finally {
+        lock.unlock();
+      }
+      // Calling Channels.close() triggers the cleanup() which will handle the negotiation
+      // failure appropriately.
+      Channels.close(evt.getChannel());
+      return;
+    }
+
     // Some other event which the connection does not handle.
     if (!(m instanceof CallResponse)) {
       ctx.sendUpstream(evt);
@@ -320,10 +382,13 @@ class Connection extends SimpleChannelUpstreamHandler {
     } else {
       LOG.error("{} unexpected exception from downstream on {}: {}", getLogPrefix(), c, e);
     }
+
     if (c.isOpen()) {
-      Channels.close(c);        // Will trigger channelClosed(), which will cleanup()
-    } else {                    // else: presumably a connection timeout.
-      cleanup(e.getMessage());  // => need to cleanup() from here directly.
+      // Calling Channels.close() will trigger channelClosed(), which will call cleanup().
+      Channels.close(c);
+    } else {
+      // Presumably a connection timeout: initiating the clean-up directly from here.
+      cleanup(e.getMessage());
     }
   }
 
@@ -332,13 +397,16 @@ class Connection extends SimpleChannelUpstreamHandler {
     return serverInfo;
   }
 
-  /**
-   * @return true iff the connection is in the DISCONNECTED state
-   */
-  boolean isDisconnected() {
+  /** The credentials policy used for the connection negotiation. */
+  CredentialsPolicy getCredentialsPolicy() {
+    return credentialsPolicy;
+  }
+
+  /** @return true iff the connection is in the TERMINATED state */
+  boolean isTerminated() {
     lock.lock();
     try {
-      return state == State.DISCONNECTED;
+      return state == State.TERMINATED;
     } finally {
       lock.unlock();
     }
@@ -363,9 +431,7 @@ class Connection extends SimpleChannelUpstreamHandler {
     return features;
   }
 
-  /**
-   * @return string representation of the peer (i.e. the server) information suitable for logging
-   */
+  /** @return string representation of the peer information suitable for logging */
   String getLogPrefix() {
     return "[peer " + serverInfo.getUuid() + "]";
   }
@@ -380,10 +446,9 @@ class Connection extends SimpleChannelUpstreamHandler {
       throws RecoverableException {
     lock.lock();
     try {
-      if (state == State.DISCONNECTED) {
+      if (state == State.TERMINATED) {
         // The upper-level caller should handle the exception and retry using a new connection.
-        throw new RecoverableException(Status.IllegalState(
-            "connection in DISCONNECTED state; cannot enqueue a message"));
+        throw new RecoverableException(Status.IllegalState("connection is terminated"));
       }
 
       if (state == State.NEW) {
@@ -457,9 +522,7 @@ class Connection extends SimpleChannelUpstreamHandler {
     return d;
   }
 
-  /**
-   * @return string representation of this object (suitable for printing into the logs, etc.)
-   */
+  /** @return string representation of this object (suitable for printing into the logs, etc.) */
   public String toString() {
     final StringBuilder buf = new StringBuilder();
     buf.append("Connection@")
@@ -503,14 +566,13 @@ class Connection extends SimpleChannelUpstreamHandler {
   private void sendCallToWire(final RpcOutboundMessage msg, Callback<Void, CallResponseInfo> cb) {
     Preconditions.checkState(lock.isHeldByCurrentThread());
     Preconditions.checkState(state == State.READY);
-    Preconditions.checkNotNull(inflightMessages);
 
     if (LOG.isTraceEnabled()) {
       LOG.trace("{} sending {}", getLogPrefix(), msg);
     }
     final int callId = msg.getHeaderBuilder().getCallId();
     final Callback<Void, CallResponseInfo> empty = inflightMessages.put(callId, cb);
-    Preconditions.checkArgument(empty == null);
+    Preconditions.checkState(empty == null);
     Channels.write(channel, msg);
   }
 
@@ -526,13 +588,22 @@ class Connection extends SimpleChannelUpstreamHandler {
     List<QueuedMessage> queued;
     Map<Integer, Callback<Void, CallResponseInfo>> inflight;
 
+    boolean needNewAuthnToken = false;
     lock.lock();
     try {
-      if (state == State.DISCONNECTED) {
+      if (state == State.TERMINATED) {
+        // The cleanup has already run.
         Preconditions.checkState(queuedMessages == null);
         Preconditions.checkState(inflightMessages == null);
         return;
       }
+      if (state == State.NEGOTIATION_FAILED) {
+        Preconditions.checkState(negotiationFailure != null);
+        Preconditions.checkState(inflightMessages.isEmpty());
+        needNewAuthnToken = negotiationFailure.status.getCode().equals(
+            RpcHeader.ErrorStatusPB.RpcErrorCodePB.FATAL_INVALID_AUTHENTICATION_TOKEN);
+      }
+      LOG.debug("{} cleaning up while in state {} due to: {}", getLogPrefix(), state, errorMessage);
 
       queued = queuedMessages;
       queuedMessages = null;
@@ -540,13 +611,14 @@ class Connection extends SimpleChannelUpstreamHandler {
       inflight = inflightMessages;
       inflightMessages = null;
 
-      state = State.DISCONNECTED;
+      state = State.TERMINATED;
     } finally {
       lock.unlock();
     }
     final Status error = Status.NetworkError(getLogPrefix() + " " +
         (errorMessage == null ? "connection reset" : errorMessage));
-    final RecoverableException exception = new RecoverableException(error);
+    final RecoverableException exception =
+        needNewAuthnToken ? new InvalidAuthnTokenException(error) : new RecoverableException(error);
 
     for (Callback<Void, CallResponseInfo> cb : inflight.values()) {
       try {
@@ -575,13 +647,36 @@ class Connection extends SimpleChannelUpstreamHandler {
     channel.connect(new InetSocketAddress(serverInfo.getResolvedAddress(), serverInfo.getPort()));
   }
 
-  /** State of the Connection object. */
+  /** Enumeration to represent the internal state of the Connection object. */
   private enum State {
-    NEW,          // The object has just been created.
-    CONNECTING,   // The establishment of TCP connection to the server has started.
-    NEGOTIATING,  // The connection negotiation has started.
-    READY,        // The connection to the server has been opened, negotiated, and ready to use.
-    DISCONNECTED, // The TCP connection has been dropped off.
+    /** The object has just been created. */
+    NEW,
+
+    /** The establishment of TCP connection to the server has started. */
+    CONNECTING,
+
+    /** The connection negotiation has started. */
+    NEGOTIATING,
+
+    /**
+     * The underlying TCP connection has been dropped off due to negotiation error and there are
+     * enqueued messages to handle. Once connection negotiation fails, the Connection object
+     * handles the affected queued RPCs appropriately. If the negotiation failed due to invalid
+     * authn token error, the upper-level code may attempt to acquire a new authentication token
+     * in that case. The connection transitions into the TERMINATED state upon notifying the
+     * affected RPCs on the connection negotiation failure.
+     */
+    NEGOTIATION_FAILED,
+
+    /** The connection to the server is opened, negotiated, and ready to use. */
+    READY,
+
+    /**
+     * The TCP connection has been dropped off, the proper clean-up procedure has run and no queued
+     * nor in-flight messages are left. In this state, the object does not accept new messages,
+     * throwing RecoverableException upon call of the enqueueMessage() method.
+     */
+    TERMINATED,
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 844e79f..53e8e59 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -18,12 +18,13 @@
 package org.apache.kudu.client;
 
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.Set;
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.stumbleupon.async.Deferred;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -37,16 +38,20 @@ import org.jboss.netty.util.HashedWheelTimer;
  * There should only be one instance of ConnectionCache per Kudu client, and it should not be
  * shared between clients.
  * <p>
- * Disconnected instances of the {@link Connection} class are replaced in the cache with instances
- * when {@link #getConnection(ServerInfo)) method is called with the same destination. Since the map
- * is keyed by UUID of the server, it would require an ever-growing set of unique Kudu servers
- * to encounter memory issues.
+ * Disconnected instances of the {@link Connection} class are replaced in the cache with new ones
+ * when {@link #getConnection(ServerInfo, Connection.CredentialsPolicy)} method is called with the
+ * same destination and matching credentials policy. Since the map is keyed by the UUID of the
+ * target server, the theoretical maximum number of elements in the cache is twice the number of
+ * all servers in the cluster (i.e. both masters and tablet servers). However, in practice it's
+ * 2 * number of masters + number of tablet servers since tablet servers do not require connections
+ * negotiated with primary credentials.
  *
  * This class is thread-safe.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 class ConnectionCache {
+
   /** Security context to use for connection negotiation. */
   private final SecurityContext securityContext;
 
@@ -59,15 +64,15 @@ class ConnectionCache {
   /** Netty's channel factory to use by connections. */
   private final ClientSocketChannelFactory channelFactory;
 
-  /** Synchronization primitive to guard access to the fields below. */
-  private final ReentrantLock lock = new ReentrantLock();
-
-  @GuardedBy("lock")
-  private final HashMap<String, Connection> uuid2connection = new HashMap<>();
-
   /**
-   * Create a new empty ConnectionCache given the specified parameters.
+   * Container mapping server UUID into the established connection from the client to the server.
+   * It may be up to two connections per server: one established with secondary credentials
+   * (e.g. authn token), another with primary ones (e.g. Kerberos credentials).
    */
+  @GuardedBy("uuid2connection")
+  private final HashMultimap<String, Connection> uuid2connection = HashMultimap.create();
+
+  /** Create a new empty ConnectionCache given the specified parameters. */
   ConnectionCache(SecurityContext securityContext,
                   long socketReadTimeoutMs,
                   HashedWheelTimer timer,
@@ -84,59 +89,81 @@ class ConnectionCache {
    * created connection is not negotiated until enqueuing the first RPC to the target server.
    *
    * @param serverInfo the server end-point to connect to
+   * @param credentialsPolicy authentication credentials policy for the connection negotiation
    * @return instance of this object with the specified destination
    */
-  public Connection getConnection(final ServerInfo serverInfo) {
-    Connection connection;
-
-    lock.lock();
-    try {
-      // First try to find an existing connection.
-      connection = uuid2connection.get(serverInfo.getUuid());
-      if (connection == null || connection.isDisconnected()) {
-        // If no valid connection is found, create a new one.
-        connection = new Connection(serverInfo, securityContext,
-            socketReadTimeoutMs, timer, channelFactory);
-        uuid2connection.put(serverInfo.getUuid(), connection);
+  public Connection getConnection(final ServerInfo serverInfo,
+                                  Connection.CredentialsPolicy credentialsPolicy) {
+    Connection result = null;
+    synchronized (uuid2connection) {
+      // Create and register a new connection object into the cache if one of the following is true:
+      //
+      //  * There isn't a registered connection to the specified destination.
+      //
+      //  * There is a connection to the specified destination, but it's in TERMINATED state.
+      //    Such connections cannot be used again and should be recycled. The connection cache
+      //    lazily removes such entries.
+      //
+      //  * A connection negotiated with primary credentials is requested but the only registered
+      //    one does not have such property. In this case, the already existing connection
+      //    (negotiated with secondary credentials, i.e. authn token) is kept in the cache and
+      //    a new one is created to be open and negotiated with primary credentials. The newly
+      //    created connection is put into the cache along with old one. We don't do anything
+      //    special to the old connection to shut it down since it may be still in use. We rely
+      //    on the server to close inactive connections in accordance with their TTL settings.
+      //
+      final Set<Connection> connections = uuid2connection.get(serverInfo.getUuid());
+      Iterator<Connection> it = connections.iterator();
+      while (it.hasNext()) {
+        Connection c = it.next();
+        if (c.isTerminated()) {
+          // Lazy recycling of the terminated connections: removing them from the cache upon
+          // an attempt to connect to the same destination again.
+          it.remove();
+          continue;
+        }
+        if (credentialsPolicy == Connection.CredentialsPolicy.ANY_CREDENTIALS ||
+            credentialsPolicy == c.getCredentialsPolicy()) {
+          // If the connection policy allows for using any credentials or the connection is
+          // negotiated using the given credentials type, this is the connection we are looking for.
+          result = c;
+        }
+      }
+      if (result == null) {
+        result = new Connection(serverInfo, securityContext,
+            socketReadTimeoutMs, timer, channelFactory, credentialsPolicy);
+        connections.add(result);
+        // There can be at most 2 connections to the same destination: one with primary and another
+        // with secondary credentials.
+        assert connections.size() <= 2;
       }
-    } finally {
-      lock.unlock();
     }
 
-    return connection;
+    return result;
   }
 
-  /**
-   * Asynchronously terminate every connection. This also cancels all the pending and in-flight
-   * RPCs.
-   */
+  /** Asynchronously terminate every connection. This cancels all the pending and in-flight RPCs. */
   Deferred<ArrayList<Void>> disconnectEverything() {
-    lock.lock();
-    try {
-      ArrayList<Deferred<Void>> deferreds = new ArrayList<>(uuid2connection.size());
+    synchronized (uuid2connection) {
+      List<Deferred<Void>> deferreds = new ArrayList<>(uuid2connection.size());
       for (Connection c : uuid2connection.values()) {
         deferreds.add(c.shutdown());
       }
       return Deferred.group(deferreds);
-    } finally {
-      lock.unlock();
     }
   }
 
   /**
    * Return a copy of the all-connections-list. This method is exposed only to allow
-   * {@ref AsyncKuduClient} to forward it, so tests could get access to the underlying elements
+   * {@link AsyncKuduClient} to forward it, so tests could get access to the underlying elements
    * of the cache.
    *
    * @return a copy of the list of all connections in the connection cache
    */
   @VisibleForTesting
   List<Connection> getConnectionListCopy() {
-    lock.lock();
-    try {
+    synchronized (uuid2connection) {
       return ImmutableList.copyOf(uuid2connection.values());
-    } finally {
-      lock.unlock();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthnTokenException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthnTokenException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthnTokenException.java
new file mode 100644
index 0000000..2f88414
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/InvalidAuthnTokenException.java
@@ -0,0 +1,39 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.client;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Exception for notifying on invalid authn token. In most use cases in the Kudu Java client code,
+ * 'invalid authn token' means 'expired authn token'. Receiving this exception means the current
+ * authentication token is no longer valid and a new one is needed to establish connections to
+ * the Kudu servers for sending RPCs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class InvalidAuthnTokenException extends RecoverableException {
+  /**
+   * @param status status object containing the reason for the exception trace
+   */
+  InvalidAuthnTokenException(Status status) {
+    super(status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index a917af2..4736b37 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -84,11 +84,11 @@ import org.apache.kudu.util.SecurityUtil;
 /**
  * Netty Pipeline handler which runs connection negotiation with
  * the server. When negotiation is complete, this removes itself
- * from the pipeline and fires a Negotiator.Result upstream.
+ * from the pipeline and fires a Negotiator.Success or Negotiator.Failure upstream.
  */
 @InterfaceAudience.Private
 public class Negotiator extends SimpleChannelUpstreamHandler {
-  static final Logger LOG = LoggerFactory.getLogger(Negotiator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(Negotiator.class);
 
   private static final SaslClientCallbackHandler SASL_CALLBACK = new SaslClientCallbackHandler();
   private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
@@ -164,10 +164,16 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
   @VisibleForTesting
   boolean overrideLoopbackForTests;
 
-  public Negotiator(String remoteHostname, SecurityContext securityContext) {
+  public Negotiator(String remoteHostname,
+                    SecurityContext securityContext,
+                    boolean ignoreAuthnToken) {
     this.remoteHostname = remoteHostname;
     this.securityContext = securityContext;
-    this.authnToken = securityContext.getAuthenticationToken();
+    if (ignoreAuthnToken) {
+      this.authnToken = null;
+    } else {
+      this.authnToken = securityContext.getAuthenticationToken();
+    }
   }
 
   public void sendHello(Channel channel) {
@@ -227,9 +233,22 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
 
   private void handleResponse(Channel chan, CallResponse callResponse)
       throws IOException {
-    // TODO(todd): this needs to handle error responses, not just success responses.
-    RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
+    final RpcHeader.ResponseHeader header = callResponse.getHeader();
+    if (header.getIsError()) {
+      final RpcHeader.ErrorStatusPB.Builder errBuilder = RpcHeader.ErrorStatusPB.newBuilder();
+      KuduRpc.readProtobuf(callResponse.getPBMessage(), errBuilder);
+      final RpcHeader.ErrorStatusPB error = errBuilder.build();
+      LOG.debug("peer {} sent connection negotiation error: {}",
+          chan.getRemoteAddress(), error.getMessage());
+
+      // The upstream code should handle the negotiation failure.
+      state = State.FINISHED;
+      chan.getPipeline().remove(this);
+      Channels.fireMessageReceived(chan, new Failure(error));
+      return;
+    }
 
+    RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
     // TODO: check that the message type matches the expected one in all
     // of the below implementations.
     switch (state) {
@@ -490,7 +509,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
       // TODO(danburkert): is this a correct assumption? would the
       // client ever be "done" but also produce handshake data?
       // if it did, would we want to encrypt the SSL message or no?
-      assert data.size() == 0;
+      assert data.isEmpty();
       return false;
     } else {
       assert data.size() > 0;
@@ -621,7 +640,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     chan.getPipeline().remove(this);
 
     Channels.write(chan, makeConnectionContext());
-    Channels.fireMessageReceived(chan, new Result(serverFeatures));
+    Channels.fireMessageReceived(chan, new Success(serverFeatures));
   }
 
   private RpcOutboundMessage makeConnectionContext() throws SaslException {
@@ -686,11 +705,24 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
    * The results of a successful negotiation. This is sent to upstream handlers in the
    * Netty pipeline after negotiation completes.
    */
-  static class Result {
+  static class Success {
     final Set<RpcFeatureFlag> serverFeatures;
 
-    public Result(Set<RpcFeatureFlag> serverFeatures) {
+    public Success(Set<RpcFeatureFlag> serverFeatures) {
       this.serverFeatures = serverFeatures;
     }
   }
+
+  /**
+   * The results of a failed negotiation. This is sent to upstream handlers in the Netty pipeline
+   * when a negotiation fails.
+   */
+  static class Failure {
+    /** The RPC error received from the server. */
+    final RpcHeader.ErrorStatusPB status;
+
+    public Failure(RpcHeader.ErrorStatusPB status) {
+      this.status = status;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
index 3f8de9b..42379ee 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcProxy.java
@@ -27,6 +27,7 @@
 package org.apache.kudu.client;
 
 import java.util.Set;
+import javax.annotation.Nonnull;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -56,14 +57,16 @@ import org.apache.kudu.util.Pair;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class RpcProxy {
+class RpcProxy {
 
   private static final Logger LOG = LoggerFactory.getLogger(RpcProxy.class);
 
   /** The reference to the top-level Kudu client object. */
+  @Nonnull
   private final AsyncKuduClient client;
 
   /** The reference to the object representing connection to the target server. */
+  @Nonnull
   private final Connection connection;
 
   /**
@@ -73,10 +76,8 @@ public class RpcProxy {
    * @param connection the connection associated with the target Kudu server
    */
   RpcProxy(AsyncKuduClient client, Connection connection) {
-    Preconditions.checkNotNull(client);
-    Preconditions.checkNotNull(connection);
-    this.client = client;
-    this.connection = connection;
+    this.client = Preconditions.checkNotNull(client);
+    this.connection = Preconditions.checkNotNull(connection);
   }
 
   /**
@@ -138,7 +139,7 @@ public class RpcProxy {
           });
     } catch (RecoverableException e) {
       // This is to handle RecoverableException(Status.IllegalState()) from
-      // Connection.enqueueMessage() if the connection turned into the DISCONNECTED state.
+      // Connection.enqueueMessage() if the connection turned into the TERMINATED state.
       client.handleRetryableError(rpc, e);
     } catch (Exception e) {
       rpc.errback(e);
@@ -190,8 +191,6 @@ public class RpcProxy {
                                            final KuduRpc<R> rpc,
                                            CallResponse response,
                                            KuduException ex) {
-    Preconditions.checkNotNull(rpc);
-
     final long start = System.nanoTime();
     if (LOG.isTraceEnabled()) {
       if (response == null) {
@@ -199,7 +198,6 @@ public class RpcProxy {
             connection.getLogPrefix(), rpc);
       } else {
         RpcHeader.ResponseHeader header = response.getHeader();
-        Preconditions.checkNotNull(header);
         LOG.trace("{} received response with rpcId {}, size {} for RPC {}",
             connection.getLogPrefix(), header.getCallId(),
             response.getTotalResponseSize(), rpc);
@@ -210,6 +208,10 @@ public class RpcProxy {
         rpc.method(), RpcTraceFrame.Action.RECEIVE_FROM_SERVER).serverInfo(
             connection.getServerInfo());
     if (ex != null) {
+      if (ex instanceof InvalidAuthnTokenException) {
+        client.handleInvalidToken(rpc);
+        return;
+      }
       if (ex instanceof RecoverableException) {
         // This check is specifically for the ERROR_SERVER_TOO_BUSY, ERROR_UNAVAILABLE and alike.
         failOrRetryRpc(client, connection, rpc, (RecoverableException) ex);
@@ -280,8 +282,8 @@ public class RpcProxy {
           connection.getLogPrefix(), e, header.getCallId(), rpc);
     }
     if (LOG.isTraceEnabled()) {
-      LOG.trace("------------------<< LEAVING  DECODE <<------------------" +
-          " time elapsed: " + ((System.nanoTime() - start) / 1000) + "us");
+      LOG.trace("------------------<< LEAVING  DECODE <<------------------ time elapsed: {} us",
+          ((System.nanoTime() - start) / 1000));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
index 8ec226b..15cf40e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RpcTraceFrame.java
@@ -52,6 +52,12 @@ class RpcTraceFrame {
         sb.append(trace.getStatus());
       }
     },
+    // Waiting for a new authn token to re-send the request.
+    GET_NEW_AUTHENTICATION_TOKEN_THEN_RETRY {
+      void appendToStringBuilder(RpcTraceFrame trace, StringBuilder sb) {
+        sb.append("waiting for new authn token");
+      }
+    },
     // After having figured out that we don't know where the RPC is going,
     // before querying the master.
     QUERY_MASTER {

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
index ac75881..0340bfd 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SecurityContext.java
@@ -51,6 +51,7 @@ import org.apache.kudu.security.Token.TokenPB;
  */
 class SecurityContext {
   @GuardedBy("this")
+  @Nullable
   private SignedTokenPB authnToken;
 
   private final DelegatedTrustManager trustManager = new DelegatedTrustManager();
@@ -76,7 +77,12 @@ class SecurityContext {
    */
   private List<ByteString> trustedCertDers = Collections.emptyList();
 
-  public SecurityContext(Subject subject) {
+  /**
+   * Construct SecurityContext object with the specified JAAS subject.
+   *
+   * @param subject JAAS Subject that the client's credentials are stored in
+   */
+  SecurityContext(Subject subject) {
     try {
       this.subject = subject;
 
@@ -85,16 +91,17 @@ class SecurityContext {
 
       this.sslContextTrustAny = SSLContext.getInstance("TLS");
       sslContextTrustAny.init(null, new TrustManager[] { new TrustAnyCert() }, null);
-
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
+  @Nullable
   public Subject getSubject() {
     return subject;
   }
 
+  @Nullable
   public synchronized byte[] exportAuthenticationCredentials() {
     if (authnToken == null || !hasTrustedCerts()) {
       return null;
@@ -148,6 +155,7 @@ class SecurityContext {
   /**
    * @return the current authentication token, or null if we have no valid token
    */
+  @Nullable
   public synchronized SignedTokenPB getAuthenticationToken() {
     return authnToken;
   }
@@ -260,7 +268,7 @@ class SecurityContext {
    * can be swapped out atomically.
    */
   private static class DelegatedTrustManager implements X509TrustManager {
-    AtomicReference<X509TrustManager> delegate = new AtomicReference<>();
+    final AtomicReference<X509TrustManager> delegate = new AtomicReference<>();
 
     @Override
     public void checkClientTrusted(X509Certificate[] chain, String authType)
@@ -279,4 +287,5 @@ class SecurityContext {
       return delegate.get().getAcceptedIssuers();
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
index 5e33203..bd1c0e9 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduClient.java
@@ -102,7 +102,7 @@ public class TestAsyncKuduClient extends BaseKuduTest {
       boolean sleep = false;
       if (!client.getConnectionListCopy().isEmpty()) {
         for (Connection c : client.getConnectionListCopy()) {
-          if (!c.isDisconnected()) {
+          if (!c.isTerminated()) {
             sleep = true;
             break;
           }

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java
new file mode 100644
index 0000000..cfe9b5a
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquire.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+package org.apache.kudu.client;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This test contains scenarios to verify that client re-acquires authn token upon expiration
+ * of the current one and automatically retries the call.
+ */
+public class TestAuthnTokenReacquire extends BaseKuduTest {
+
+  private static final String TABLE_NAME = "TestAuthnTokenReacquire-table";
+  private static final int TOKEN_TTL_SEC = 1;
+  private static final int OP_TIMEOUT_MS = 60 * TOKEN_TTL_SEC * 1000;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Inject additional INVALID_AUTHENTICATION_TOKEN responses from both the master and tablet
+    // servers, even for not-yet-expired tokens.
+    miniClusterBuilder
+        .enableKerberos()
+        .addMasterFlag(String.format("--authn_token_validity_seconds=%d", TOKEN_TTL_SEC))
+        .addMasterFlag("--rpc_inject_invalid_authn_token_ratio=0.5")
+        .addTserverFlag("--rpc_inject_invalid_authn_token_ratio=0.5");
+
+    BaseKuduTest.setUpBeforeClass();
+  }
+
+  private static void dropConnections() {
+    for (Connection c : client.getConnectionListCopy()) {
+      c.disconnect();
+    }
+  }
+
+  private static void dropConnectionsAndExpireToken() throws InterruptedException {
+    // Drop all connections from the client to Kudu servers.
+    dropConnections();
+    // Wait for authn token expiration.
+    Thread.sleep(TOKEN_TTL_SEC * 1000);
+  }
+
+  @Test
+  public void testBasicMasterOperations() throws Exception {
+    // To ratchet up the intensity a bit, run the scenario by several concurrent threads.
+    List<Thread> threads = new ArrayList<>();
+    final Map<Integer, Throwable> exceptions =
+        Collections.synchronizedMap(new HashMap<Integer, Throwable>());
+    for (int i = 0; i < 8; ++i) {
+      final int threadIdx = i;
+      Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          final String tableName = "TestAuthnTokenReacquire-table-" + threadIdx;
+          try {
+            ListTabletServersResponse response = syncClient.listTabletServers();
+            assertNotNull(response);
+            dropConnectionsAndExpireToken();
+
+            ListTablesResponse tableList = syncClient.getTablesList(tableName);
+            assertNotNull(tableList);
+            assertTrue(tableList.getTablesList().isEmpty());
+            dropConnectionsAndExpireToken();
+
+            syncClient.createTable(tableName, basicSchema, getBasicCreateTableOptions());
+            dropConnectionsAndExpireToken();
+
+            KuduTable table = syncClient.openTable(tableName);
+            assertEquals(basicSchema.getColumnCount(), table.getSchema().getColumnCount());
+            dropConnectionsAndExpireToken();
+
+            syncClient.deleteTable(tableName);
+            assertFalse(syncClient.tableExists(tableName));
+          } catch (Throwable e) {
+            //noinspection ThrowableResultOfMethodCallIgnored
+            exceptions.put(threadIdx, e);
+          }
+        }
+      });
+      thread.run();
+      threads.add(thread);
+    }
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    if (!exceptions.isEmpty()) {
+      for (Map.Entry<Integer, Throwable> e : exceptions.entrySet()) {
+        LOG.error("exception in thread {}: {}", e.getKey(), e.getValue());
+      }
+      fail("test failed: unexpected errors");
+    }
+  }
+
+  @Test
+  public void testBasicWorkflow() throws Exception {
+    KuduTable table = syncClient.createTable(TABLE_NAME, basicSchema,
+        getBasicCreateTableOptions());
+    dropConnectionsAndExpireToken();
+
+    KuduSession session = syncClient.newSession();
+    session.setTimeoutMillis(OP_TIMEOUT_MS);
+    session.apply(createBasicSchemaInsert(table, 1));
+    session.flush();
+    RowErrorsAndOverflowStatus errors = session.getPendingErrors();
+    assertFalse(errors.isOverflowed());
+    assertEquals(0, session.countPendingErrors());
+    dropConnectionsAndExpireToken();
+
+    KuduTable scanTable = syncClient.openTable(TABLE_NAME);
+    AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(client, scanTable)
+        .scanRequestTimeout(OP_TIMEOUT_MS)
+        .build();
+    assertEquals(1, countRowsInScan(scanner));
+    dropConnectionsAndExpireToken();
+
+    syncClient.deleteTable(TABLE_NAME);
+    assertFalse(syncClient.tableExists(TABLE_NAME));
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
new file mode 100644
index 0000000..d20d534
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAuthnTokenReacquireOpen.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+package org.apache.kudu.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This test contains a special scenario to make sure the automatic authn token re-acquisition works
+ * in the case when the client has established a connection to the master using secondary
+ * credentials. The subtlety is that an authn token cannot be acquired using such a connection,
+ * so this test verifies that the client opens a new connection using its primary credentials to
+ * acquire a new authentication token and automatically retries its RPCs with the new authn token.
+ */
+public class TestAuthnTokenReacquireOpen extends BaseKuduTest {
+
+  private static final String TABLE_NAME = "TestAuthnTokenReacquireOpen-table";
+  private static final int TOKEN_TTL_SEC = 1;
+  private static final int OP_TIMEOUT_MS = 60 * TOKEN_TTL_SEC * 1000;
+  private static final int KEEPALIVE_TIME_MS = 2 * OP_TIMEOUT_MS;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Set appropriate TTL for authn token and connection keep-alive property, so the client could
+    // keep an open connection to the master when its authn token is already expired. Inject
+    // additional INVALID_AUTHENTICATION_TOKEN responses from the tablet server even for
+    // not-yet-expired tokens for an extra stress on the client.
+    miniClusterBuilder
+        .enableKerberos()
+        .addMasterFlag(String.format("--authn_token_validity_seconds=%d", TOKEN_TTL_SEC))
+        .addMasterFlag(String.format("--rpc_default_keepalive_time_ms=%d", KEEPALIVE_TIME_MS))
+        .addTserverFlag(String.format("--rpc_default_keepalive_time_ms=%d", KEEPALIVE_TIME_MS))
+        .addTserverFlag("--rpc_inject_invalid_authn_token_ratio=0.5");
+
+    // We want to have a cluster with a single master.
+    final int NUM_MASTERS = 1;
+    final int NUM_TABLET_SERVERS = 3;
+    doSetup(NUM_MASTERS, NUM_TABLET_SERVERS);
+  }
+
+  private static void dropConnections() {
+    for (Connection c : client.getConnectionListCopy()) {
+      c.disconnect();
+    }
+  }
+
+  private static void expireToken() throws InterruptedException {
+    // Wait for authn token expiration.
+    Thread.sleep(TOKEN_TTL_SEC * 1000);
+  }
+
+  @Test
+  public void test() throws Exception {
+    // Establish a connection to the cluster, get the list of tablet servers. That would fetch
+    // an authn token.
+    ListTabletServersResponse response = syncClient.listTabletServers();
+    assertNotNull(response);
+    dropConnections();
+
+    // The connection to the master has been dropped. Make a call to the master again so the client
+    // would create a new connection using authn token.
+    ListTablesResponse tableList = syncClient.getTablesList(null);
+    assertNotNull(tableList);
+    assertTrue(tableList.getTablesList().isEmpty());
+
+    syncClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
+    assertTrue(syncClient.tableExists(TABLE_NAME));
+
+    expireToken();
+
+    // Try scan table rows once the authn token has expired. This request goes to corresponding
+    // tablet server, and a new connection should be negotiated. During connection negotiation,
+    // the server authenticates the client using authn token, which is expired.
+    KuduTable scanTable = syncClient.openTable(TABLE_NAME);
+    AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(client, scanTable)
+        .scanRequestTimeout(OP_TIMEOUT_MS)
+        .build();
+    assertEquals(0, countRowsInScan(scanner));
+
+    syncClient.deleteTable(TABLE_NAME);
+    assertFalse(syncClient.tableExists(TABLE_NAME));
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
index 2404005..5b5d4cf 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectionCache.java
@@ -64,18 +64,18 @@ public class TestConnectionCache {
 
       // 1 tserver and 3 masters and 3 connections from the newRpcProxy() in the loop above.
       assertEquals(1 + 3 + 3, client.getConnectionListCopy().size());
-      assertFalse(allConnectionsDisconnected(client));
+      assertFalse(allConnectionsTerminated(client));
 
       final RpcProxy proxy = client.newRpcProxy(serverInfos.get(0));
 
       // Disconnect from the server.
       proxy.getConnection().disconnect().awaitUninterruptibly();
-      waitForConnectionToClose(proxy.getConnection());
-      assertTrue(proxy.getConnection().isDisconnected());
+      waitForConnectionToTerminate(proxy.getConnection());
+      assertTrue(proxy.getConnection().isTerminated());
 
       // Make sure not all the connections in the connection cache are disconnected yet. Actually,
       // only the connection to server '0' should be disconnected.
-      assertFalse(allConnectionsDisconnected(client));
+      assertFalse(allConnectionsTerminated(client));
 
       // For a new RpcProxy instance, a new connection to the same destination is established.
       final RpcProxy newHelper = client.newRpcProxy(serverInfos.get(0));
@@ -96,9 +96,9 @@ public class TestConnectionCache {
       // Test disconnecting and make sure we cleaned up all the connections.
       for (Connection c : client.getConnectionListCopy()) {
         c.disconnect().awaitUninterruptibly();
-        waitForConnectionToClose(c);
+        waitForConnectionToTerminate(c);
       }
-      assertTrue(allConnectionsDisconnected(client));
+      assertTrue(allConnectionsTerminated(client));
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -106,19 +106,19 @@ public class TestConnectionCache {
     }
   }
 
-  private boolean allConnectionsDisconnected(AsyncKuduClient client) {
+  private boolean allConnectionsTerminated(AsyncKuduClient client) {
     for (Connection c : client.getConnectionListCopy()) {
-      if (!c.isDisconnected()) {
+      if (!c.isTerminated()) {
         return false;
       }
     }
     return true;
   }
 
-  private void waitForConnectionToClose(Connection c) throws InterruptedException {
+  private void waitForConnectionToTerminate(Connection c) throws InterruptedException {
     DeadlineTracker deadlineTracker = new DeadlineTracker();
     deadlineTracker.setDeadline(5000);
-    while (!c.isDisconnected() && !deadlineTracker.timedOut()) {
+    while (!c.isTerminated() && !deadlineTracker.timedOut()) {
       Thread.sleep(250);
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/603c1578/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index ac3e52f..d10f7b0 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -48,7 +48,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kudu.client.Negotiator.Result;
+import org.apache.kudu.client.Negotiator.Success;
 import org.apache.kudu.rpc.RpcHeader.AuthenticationTypePB;
 import org.apache.kudu.rpc.RpcHeader.ConnectionContextPB;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
@@ -102,7 +102,7 @@ public class TestNegotiator {
   }
 
   private void startNegotiation(boolean fakeLoopback) {
-    Negotiator negotiator = new Negotiator("127.0.0.1", secContext);
+    Negotiator negotiator = new Negotiator("127.0.0.1", secContext, false);
     negotiator.overrideLoopbackForTests = fakeLoopback;
     embedder = new DecoderEmbedder<Object>(negotiator);
     negotiator.sendHello(embedder.getPipeline().getChannel());
@@ -135,19 +135,19 @@ public class TestNegotiator {
 
   /**
    * Checks that the client sends a connection context and then yields
-   * a Negotiation.Result to the pipeline.
+   * a Negotiation.Success to the pipeline.
    * @return the result
    */
-  private Result assertComplete() {
+  private Success assertComplete() {
     RpcOutboundMessage msg = (RpcOutboundMessage)embedder.poll();
     ConnectionContextPB connCtx = (ConnectionContextPB)msg.getBody();
     assertEquals(Negotiator.CONNECTION_CTX_CALL_ID, msg.getHeaderBuilder().getCallId());
     assertEquals(System.getProperty("user.name"), connCtx.getDEPRECATEDUserInfo().getRealUser());
 
-    // Expect the client to also emit a negotiation Result.
-    Result result = (Result)embedder.poll();
-    assertNotNull(result);
-    return result;
+    // Expect the client to also emit a negotiation Success.
+    Success success = (Success)embedder.poll();
+    assertNotNull(success);
+    return success;
   }
 
   @Test