You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/08/31 05:12:07 UTC

kudu git commit: java-client: improve error messages when failing to connect to secure cluster

Repository: kudu
Updated Branches:
  refs/heads/branch-1.5.x 2de07e594 -> fcbfcc746


java-client: improve error messages when failing to connect to secure cluster

Cleans up the error handling in Connection.java so that is passes
through non-recoverable exceptions, and improves the error which results
from attempting to connect to a secure cluster with an unauthenticated
client. Example of the new error:

    org.apache.kudu.client.NonRecoverableException:
      Couldn't find a valid master in (nightly512-1.gce.cloudera.com:7051).
      Exceptions received:
          org.apache.kudu.client.NonRecoverableException:
              Server requires Kerberos, but this client is not authenticated (kinit)

Change-Id: I41c3229dcda284dce57cb6f6930efe3b50fa9698
Reviewed-on: http://gerrit.cloudera.org:8080/7824
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins
(cherry picked from commit 986e8de63d8687421c476de07c4e889129062637)
Reviewed-on: http://gerrit.cloudera.org:8080/7915
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fcbfcc74
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fcbfcc74
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fcbfcc74

Branch: refs/heads/branch-1.5.x
Commit: fcbfcc7469deaf60c90b74fbc3353c54dd3a4803
Parents: 2de07e5
Author: Dan Burkert <da...@apache.org>
Authored: Thu Aug 24 17:47:30 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Thu Aug 31 05:11:41 2017 +0000

----------------------------------------------------------------------
 .../apache/kudu/client/ConnectToCluster.java    |  4 +-
 .../java/org/apache/kudu/client/Connection.java | 78 +++++++++-------
 .../java/org/apache/kudu/client/Negotiator.java | 93 +++++++++++++-------
 .../org/apache/kudu/client/TestSecurity.java    | 13 ++-
 4 files changed, 110 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fcbfcc74/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index 3f9c9e5..832bbbc 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -216,8 +216,8 @@ final class ConnectToCluster {
       if (allUnrecoverable) {
         // This will stop retries.
         String msg = String.format("Couldn't find a valid master in (%s). " +
-            "Exceptions received: %s", allHosts,
-            Joiner.on(",").join(Lists.transform(
+            "Exceptions received: [%s]", allHosts,
+            Joiner.on(", ").join(Lists.transform(
                 exceptionsReceived, Functions.toStringFunction())));
         Status s = Status.ServiceUnavailable(msg);
         responseD.callback(new NonRecoverableException(s));

http://git-wip-us.apache.org/repos/asf/kudu/blob/fcbfcc74/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index c9cd594..f21d2bb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -238,20 +238,18 @@ class Connection extends SimpleChannelUpstreamHandler {
 
   /** {@inheritDoc} */
   @Override
-  public void channelDisconnected(final ChannelHandlerContext ctx,
-                                  final ChannelStateEvent e) throws Exception {
+  public void channelDisconnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) {
     // No need to call super.channelDisconnected(ctx, e) -- there should be nobody in the upstream
     // pipeline after Connection itself. So, just handle the disconnection event ourselves.
-    cleanup("connection disconnected");
+    cleanup(new RecoverableException(Status.NetworkError("connection disconnected")));
   }
 
   /** {@inheritDoc} */
   @Override
-  public void channelClosed(final ChannelHandlerContext ctx,
-                            final ChannelStateEvent e) throws Exception {
+  public void channelClosed(final ChannelHandlerContext ctx, final ChannelStateEvent e) {
     // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
     // pipeline after Connection itself. So, just handle the close event ourselves.
-    cleanup("connection closed");
+    cleanup(new RecoverableException(Status.NetworkError("connection closed")));
   }
 
   /** {@inheritDoc} */
@@ -382,36 +380,48 @@ class Connection extends SimpleChannelUpstreamHandler {
 
   /** {@inheritDoc} */
   @Override
-  public void exceptionCaught(final ChannelHandlerContext ctx,
-                              final ExceptionEvent event) {
-    final Throwable e = event.getCause();
-    final Channel c = event.getChannel();
-
-    if (e instanceof RejectedExecutionException) {
-      LOG.warn("{} RPC rejected by the executor: {} (ignore if shutting down)", getLogPrefix(), e);
+  public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent event) {
+    Throwable e = event.getCause();
+    Channel c = event.getChannel();
+
+    KuduException error;
+    if (e instanceof KuduException) {
+      error = (KuduException) e;
+    } else if (e instanceof RejectedExecutionException) {
+      String message = String.format("%s RPC rejected by the executor (ignore if shutting down)",
+                                     getLogPrefix());
+      error = new RecoverableException(Status.NetworkError(message), e);
+      LOG.warn(message, e);
     } else if (e instanceof ReadTimeoutException) {
-      LOG.debug("{} encountered a read timeout; closing the channel", getLogPrefix());
+      String message = String.format("%s encountered a read timeout; closing the channel",
+                                     getLogPrefix());
+      error = new RecoverableException(Status.NetworkError(message), e);
+      LOG.debug(message);
     } else if (e instanceof ClosedChannelException) {
-      if (!explicitlyDisconnected) {
-        LOG.info("{} lost connection to peer", getLogPrefix());
-      }
-    } else if (e instanceof SSLException && explicitlyDisconnected) {
+      String message = String.format(
+          explicitlyDisconnected ? "%s disconnected from peer" : "%s lost connection to peer",
+          getLogPrefix());
+      error = new RecoverableException(Status.NetworkError(message), e);
+      LOG.info(message, e);
+    } else {
+      String message = String.format("%s unexpected exception from downstream on %s",
+                                     getLogPrefix(), c);
+      error = new RecoverableException(Status.NetworkError(message), e);
+
       // There's a race in Netty where, when we call Channel.close(), it tries
       // to send a TLS 'shutdown' message and enters a shutdown state. If another
       // thread races to send actual data on the channel, then Netty will get a
       // bit confused that we are trying to send data and misinterpret it as a
       // renegotiation attempt, and throw an SSLException. So, we just ignore any
-      // SSLException if we've already attempted to close.
-    } else {
-      LOG.error("{} unexpected exception from downstream on {}: {}", getLogPrefix(), c, e);
+      // SSLException if we've already attempted to close, otherwise log the error.
+      if (!(e instanceof SSLException) || !explicitlyDisconnected) {
+        LOG.error(message, e);
+      }
     }
 
+    cleanup(error);
     if (c.isOpen()) {
-      // Calling Channels.close() will trigger channelClosed(), which will call cleanup().
       Channels.close(c);
-    } else {
-      // Presumably a connection timeout: initiating the clean-up directly from here.
-      cleanup(e.getMessage());
     }
   }
 
@@ -615,9 +625,9 @@ class Connection extends SimpleChannelUpstreamHandler {
    * callbacks. The callee is supposed to handle the error and retry sending the messages,
    * if needed.
    *
-   * @param errorMessage a string to describe the cause of the cleanup
+   * @param error the exception which caused the connection cleanup
    */
-  private void cleanup(final String errorMessage) {
+  private void cleanup(KuduException error) {
     List<QueuedMessage> queued;
     Map<Integer, Callback<Void, CallResponseInfo>> inflight;
 
@@ -636,7 +646,8 @@ class Connection extends SimpleChannelUpstreamHandler {
         needNewAuthnToken = negotiationFailure.status.getCode().equals(
             RpcHeader.ErrorStatusPB.RpcErrorCodePB.FATAL_INVALID_AUTHENTICATION_TOKEN);
       }
-      LOG.debug("{} cleaning up while in state {} due to: {}", getLogPrefix(), state, errorMessage);
+      LOG.debug("{} cleaning up while in state {} due to: {}",
+                getLogPrefix(), state, error.getMessage());
 
       queued = queuedMessages;
       queuedMessages = null;
@@ -648,14 +659,13 @@ class Connection extends SimpleChannelUpstreamHandler {
     } finally {
       lock.unlock();
     }
-    final Status error = Status.NetworkError(getLogPrefix() + " " +
-        (errorMessage == null ? "connection reset" : errorMessage));
-    final RecoverableException exception =
-        needNewAuthnToken ? new InvalidAuthnTokenException(error) : new RecoverableException(error);
+    if (needNewAuthnToken) {
+      error = new InvalidAuthnTokenException(error.getStatus());
+    }
 
     for (Callback<Void, CallResponseInfo> cb : inflight.values()) {
       try {
-        cb.call(new CallResponseInfo(null, exception));
+        cb.call(new CallResponseInfo(null, error));
       } catch (Exception e) {
         LOG.warn("{} exception while aborting in-flight call: {}", getLogPrefix(), e);
       }
@@ -664,7 +674,7 @@ class Connection extends SimpleChannelUpstreamHandler {
     if (queued != null) {
       for (QueuedMessage qm : queued) {
         try {
-          qm.cb.call(new CallResponseInfo(null, exception));
+          qm.cb.call(new CallResponseInfo(null, error));
         } catch (Exception e) {
           LOG.warn("{} exception while aborting enqueued call: {}", getLogPrefix(), e);
         }

http://git-wip-us.apache.org/repos/asf/kudu/blob/fcbfcc74/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 4736b37..ad62b9f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -31,6 +31,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.security.cert.Certificate;
 import java.util.List;
@@ -52,7 +53,6 @@ import javax.security.sasl.SaslException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -60,6 +60,7 @@ import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.UnsafeByteOperations;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.ietf.jgss.GSSException;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -106,7 +107,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
   static final int CONNECTION_CTX_CALL_ID = -3;
   static final int SASL_CALL_ID = -33;
 
-  private static enum State {
+  private enum State {
     INITIAL,
     AWAIT_NEGOTIATE,
     AWAIT_TLS_HANDSHAKE,
@@ -221,8 +222,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
   }
 
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
-      throws Exception {
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws IOException {
     Object m = evt.getMessage();
     if (!(m instanceof CallResponse)) {
       ctx.sendUpstream(evt);
@@ -231,8 +231,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     handleResponse(ctx.getChannel(), (CallResponse)m);
   }
 
-  private void handleResponse(Channel chan, CallResponse callResponse)
-      throws IOException {
+  private void handleResponse(Channel chan, CallResponse callResponse) throws IOException {
     final RpcHeader.ResponseHeader header = callResponse.getHeader();
     if (header.getIsError()) {
       final RpcHeader.ErrorStatusPB.Builder errBuilder = RpcHeader.ErrorStatusPB.newBuilder();
@@ -270,8 +269,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     }
   }
 
-  private void handleSaslMessage(Channel chan, NegotiatePB response)
-      throws IOException {
+  private void handleSaslMessage(Channel chan, NegotiatePB response) throws IOException {
     switch (response.getStep()) {
       case SASL_CHALLENGE:
         handleChallengeResponse(chan, response);
@@ -298,8 +296,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     return saslBuilder.build();
   }
 
-  private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws
-      SaslException, SSLException {
+  private void handleNegotiateResponse(Channel chan,
+                                       RpcHeader.NegotiatePB response) throws IOException {
     Preconditions.checkState(response.getStep() == NegotiateStep.NEGOTIATE,
         "Expected NEGOTIATE message, got {}", response.getStep());
 
@@ -343,7 +341,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     }
   }
 
-  private void chooseAndInitializeSaslMech(NegotiatePB response) throws SaslException {
+  private void chooseAndInitializeSaslMech(NegotiatePB response) throws NonRecoverableException {
     // Gather the set of server-supported mechanisms.
     Set<String> serverMechs = Sets.newHashSet();
     for (RpcHeader.NegotiatePB.SaslMechanism mech : response.getSaslMechanismsList()) {
@@ -367,6 +365,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
       if ("GSSAPI".equals(clientMech)) {
         props.put(Sasl.QOP, "auth-int");
       }
+
       try {
         saslClient = Sasl.createSaslClient(new String[]{ clientMech },
                                            null,
@@ -378,14 +377,26 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
         break;
       } catch (SaslException e) {
         errorsByMech.put(clientMech, e.getMessage());
-        saslClient = null;
       }
     }
-    if (chosenMech == null) {
-      throw new SaslException("unable to negotiate a matching mechanism. Errors: [" +
-                              Joiner.on(",").withKeyValueSeparator(": ").join(errorsByMech) +
-                              "]");
+
+    if (chosenMech != null) {
+      LOG.debug("SASL mechanism {} chosen for peer {}", chosenMech, remoteHostname);
+      return;
     }
+
+    // TODO(KUDU-1948): when the Java client has an option to require security, detect the case
+    // where the server is configured without Kerberos and the client requires it.
+    String message;
+    if (serverMechs.size() == 1 && serverMechs.contains("GSSAPI")) {
+      // Give a better error diagnostic for common case of an unauthenticated client connecting
+      // to a secure server.
+      message = "Server requires Kerberos, but this client is not authenticated (kinit)";
+    } else {
+      message = "Unable to negotiate a matching mechanism. Errors: [" +
+                Joiner.on(", ").withKeyValueSeparator(": ").join(errorsByMech) + "]";
+    }
+    throw new NonRecoverableException(Status.ConfigurationError(message));
   }
 
   private AuthenticationTypePB.TypeCase chooseAuthenticationType(NegotiatePB response) {
@@ -454,8 +465,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
    * Handle an inbound message during the TLS handshake. If this message
    * causes the handshake to complete, triggers the beginning of SASL initiation.
    */
-  private void handleTlsMessage(Channel chan, NegotiatePB response)
-      throws IOException {
+  private void handleTlsMessage(Channel chan, NegotiatePB response) throws IOException {
     Preconditions.checkState(response.getStep() == NegotiateStep.TLS_HANDSHAKE);
     Preconditions.checkArgument(!response.getTlsHandshake().isEmpty(),
         "empty TLS message from server");
@@ -529,7 +539,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
         .build());
   }
 
-  private void startAuthentication(Channel chan) throws SaslException {
+  private void startAuthentication(Channel chan) throws SaslException, NonRecoverableException {
     switch (chosenAuthnType) {
       case SASL:
         sendSaslInitiate(chan);
@@ -556,8 +566,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     sendSaslMessage(chan, builder.build());
   }
 
-  private void handleTokenExchangeResponse(Channel chan,
-                                           NegotiatePB response) throws SaslException {
+  private void handleTokenExchangeResponse(Channel chan, NegotiatePB response)
+      throws SaslException {
     Preconditions.checkArgument(response.getStep() == NegotiateStep.TOKEN_EXCHANGE,
         "expected TOKEN_EXCHANGE, got step: {}", response.getStep());
 
@@ -565,7 +575,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     finish(chan);
   }
 
-  private void sendSaslInitiate(Channel chan) throws SaslException {
+  private void sendSaslInitiate(Channel chan) throws SaslException, NonRecoverableException {
     RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
     if (saslClient.hasInitialResponse()) {
       byte[] initialResponse = evaluateChallenge(new byte[0]);
@@ -577,8 +587,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     sendSaslMessage(chan, builder.build());
   }
 
-  private void handleChallengeResponse(Channel chan, RpcHeader.NegotiatePB response) throws
-      SaslException {
+  private void handleChallengeResponse(Channel chan, RpcHeader.NegotiatePB response)
+      throws SaslException, NonRecoverableException {
     byte[] saslToken = evaluateChallenge(response.getToken().toByteArray());
     if (saslToken == null) {
       throw new IllegalStateException("Not expecting an empty token");
@@ -592,7 +602,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
   /**
    * Verify the channel bindings included in 'response'. This is used only
    * for GSSAPI-authenticated connections over TLS.
-   * @throws RuntimeException on failure to verify
+   * @throws SSLPeerUnverifiedException on failure to verify
    */
   private void verifyChannelBindings(NegotiatePB response) throws IOException {
     byte[] expected = SecurityUtil.getEndpointChannelBindings(peerCert);
@@ -671,18 +681,33 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     return new RpcOutboundMessage(header, pb);
   }
 
-  private byte[] evaluateChallenge(final byte[] challenge) throws SaslException {
+  private byte[] evaluateChallenge(final byte[] challenge)
+      throws SaslException, NonRecoverableException {
     try {
       return Subject.doAs(securityContext.getSubject(),
           new PrivilegedExceptionAction<byte[]>() {
-          @Override
-          public byte[] run() throws Exception {
-            return saslClient.evaluateChallenge(challenge);
-          }
-        });
-    } catch (Exception e) {
-      Throwables.throwIfInstanceOf(e, SaslException.class);
-      throw new RuntimeException(e);
+            @Override
+            public byte[] run() throws SaslException {
+              return saslClient.evaluateChallenge(challenge);
+            }
+          });
+    } catch (PrivilegedActionException e) {
+      // This cast is safe because the action above only throws checked SaslException.
+      SaslException saslException = (SaslException) e.getCause();
+
+      // TODO(KUDU-2121): We should never get to this point if the client does not have
+      // Kerberos credentials, but it seems that on certain platforms it can happen.
+      // So, we try and determine whether the evaluateChallenge failed due to missing
+      // credentials, and return a nicer error message if so.
+      Throwable cause = saslException.getCause();
+      if (cause instanceof GSSException &&
+          ((GSSException) cause).getMajor() == GSSException.NO_CRED) {
+        throw new NonRecoverableException(
+            Status.ConfigurationError(
+                "Server requires Kerberos, but this client is not authenticated (kinit)"),
+            saslException);
+      }
+      throw saslException;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fcbfcc74/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
index 35fc389..50b566a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
@@ -39,24 +39,22 @@ public class TestSecurity extends BaseKuduTest {
    * be able to authenticate to the masters and tablet servers using tokens.
    */
   @Test
-  public void testImportExportAuthenticationCredentials() throws InterruptedException, Exception {
+  public void testImportExportAuthenticationCredentials() throws Exception {
     byte[] authnData = client.exportAuthenticationCredentials().join();
     assertNotNull(authnData);
     String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
     try {
-      KuduClient newClient = new KuduClient.KuduClientBuilder(masterAddresses)
-          .defaultAdminOperationTimeoutMs(2000)
-          .build();
+      KuduClient newClient = new KuduClient.KuduClientBuilder(masterAddresses).build();
 
       // Test that a client with no credentials cannot list servers.
       try {
         newClient.listTabletServers();
         Assert.fail("should not have been able to connect to a secure cluster " +
             "with no credentials");
-      } catch (Exception e) {
-        // Expected.
-        // TODO(todd): assert on the particular exception type and error string
+      } catch (NonRecoverableException e) {
+        Assert.assertTrue(e.getMessage().contains(
+            "Server requires Kerberos, but this client is not authenticated"));
       }
 
       // If we import the authentication data from the old authenticated client,
@@ -72,5 +70,4 @@ public class TestSecurity extends BaseKuduTest {
       System.setProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY, oldTicketCache);
     }
   }
-
 }