You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/08/31 05:12:07 UTC
kudu git commit: java-client: improve error messages when failing to
connect to secure cluster
Repository: kudu
Updated Branches:
refs/heads/branch-1.5.x 2de07e594 -> fcbfcc746
java-client: improve error messages when failing to connect to secure cluster
Cleans up the error handling in Connection.java so that is passes
through non-recoverable exceptions, and improves the error which results
from attempting to connect to a secure cluster with an unauthenticated
client. Example of the new error:
org.apache.kudu.client.NonRecoverableException:
Couldn't find a valid master in (nightly512-1.gce.cloudera.com:7051).
Exceptions received:
org.apache.kudu.client.NonRecoverableException:
Server requires Kerberos, but this client is not authenticated (kinit)
Change-Id: I41c3229dcda284dce57cb6f6930efe3b50fa9698
Reviewed-on: http://gerrit.cloudera.org:8080/7824
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins
(cherry picked from commit 986e8de63d8687421c476de07c4e889129062637)
Reviewed-on: http://gerrit.cloudera.org:8080/7915
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fcbfcc74
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fcbfcc74
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fcbfcc74
Branch: refs/heads/branch-1.5.x
Commit: fcbfcc7469deaf60c90b74fbc3353c54dd3a4803
Parents: 2de07e5
Author: Dan Burkert <da...@apache.org>
Authored: Thu Aug 24 17:47:30 2017 -0700
Committer: Dan Burkert <da...@apache.org>
Committed: Thu Aug 31 05:11:41 2017 +0000
----------------------------------------------------------------------
.../apache/kudu/client/ConnectToCluster.java | 4 +-
.../java/org/apache/kudu/client/Connection.java | 78 +++++++++-------
.../java/org/apache/kudu/client/Negotiator.java | 93 +++++++++++++-------
.../org/apache/kudu/client/TestSecurity.java | 13 ++-
4 files changed, 110 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/fcbfcc74/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index 3f9c9e5..832bbbc 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -216,8 +216,8 @@ final class ConnectToCluster {
if (allUnrecoverable) {
// This will stop retries.
String msg = String.format("Couldn't find a valid master in (%s). " +
- "Exceptions received: %s", allHosts,
- Joiner.on(",").join(Lists.transform(
+ "Exceptions received: [%s]", allHosts,
+ Joiner.on(", ").join(Lists.transform(
exceptionsReceived, Functions.toStringFunction())));
Status s = Status.ServiceUnavailable(msg);
responseD.callback(new NonRecoverableException(s));
http://git-wip-us.apache.org/repos/asf/kudu/blob/fcbfcc74/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index c9cd594..f21d2bb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -238,20 +238,18 @@ class Connection extends SimpleChannelUpstreamHandler {
/** {@inheritDoc} */
@Override
- public void channelDisconnected(final ChannelHandlerContext ctx,
- final ChannelStateEvent e) throws Exception {
+ public void channelDisconnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) {
// No need to call super.channelDisconnected(ctx, e) -- there should be nobody in the upstream
// pipeline after Connection itself. So, just handle the disconnection event ourselves.
- cleanup("connection disconnected");
+ cleanup(new RecoverableException(Status.NetworkError("connection disconnected")));
}
/** {@inheritDoc} */
@Override
- public void channelClosed(final ChannelHandlerContext ctx,
- final ChannelStateEvent e) throws Exception {
+ public void channelClosed(final ChannelHandlerContext ctx, final ChannelStateEvent e) {
// No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
// pipeline after Connection itself. So, just handle the close event ourselves.
- cleanup("connection closed");
+ cleanup(new RecoverableException(Status.NetworkError("connection closed")));
}
/** {@inheritDoc} */
@@ -382,36 +380,48 @@ class Connection extends SimpleChannelUpstreamHandler {
/** {@inheritDoc} */
@Override
- public void exceptionCaught(final ChannelHandlerContext ctx,
- final ExceptionEvent event) {
- final Throwable e = event.getCause();
- final Channel c = event.getChannel();
-
- if (e instanceof RejectedExecutionException) {
- LOG.warn("{} RPC rejected by the executor: {} (ignore if shutting down)", getLogPrefix(), e);
+ public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent event) {
+ Throwable e = event.getCause();
+ Channel c = event.getChannel();
+
+ KuduException error;
+ if (e instanceof KuduException) {
+ error = (KuduException) e;
+ } else if (e instanceof RejectedExecutionException) {
+ String message = String.format("%s RPC rejected by the executor (ignore if shutting down)",
+ getLogPrefix());
+ error = new RecoverableException(Status.NetworkError(message), e);
+ LOG.warn(message, e);
} else if (e instanceof ReadTimeoutException) {
- LOG.debug("{} encountered a read timeout; closing the channel", getLogPrefix());
+ String message = String.format("%s encountered a read timeout; closing the channel",
+ getLogPrefix());
+ error = new RecoverableException(Status.NetworkError(message), e);
+ LOG.debug(message);
} else if (e instanceof ClosedChannelException) {
- if (!explicitlyDisconnected) {
- LOG.info("{} lost connection to peer", getLogPrefix());
- }
- } else if (e instanceof SSLException && explicitlyDisconnected) {
+ String message = String.format(
+ explicitlyDisconnected ? "%s disconnected from peer" : "%s lost connection to peer",
+ getLogPrefix());
+ error = new RecoverableException(Status.NetworkError(message), e);
+ LOG.info(message, e);
+ } else {
+ String message = String.format("%s unexpected exception from downstream on %s",
+ getLogPrefix(), c);
+ error = new RecoverableException(Status.NetworkError(message), e);
+
// There's a race in Netty where, when we call Channel.close(), it tries
// to send a TLS 'shutdown' message and enters a shutdown state. If another
// thread races to send actual data on the channel, then Netty will get a
// bit confused that we are trying to send data and misinterpret it as a
// renegotiation attempt, and throw an SSLException. So, we just ignore any
- // SSLException if we've already attempted to close.
- } else {
- LOG.error("{} unexpected exception from downstream on {}: {}", getLogPrefix(), c, e);
+ // SSLException if we've already attempted to close, otherwise log the error.
+ if (!(e instanceof SSLException) || !explicitlyDisconnected) {
+ LOG.error(message, e);
+ }
}
+ cleanup(error);
if (c.isOpen()) {
- // Calling Channels.close() will trigger channelClosed(), which will call cleanup().
Channels.close(c);
- } else {
- // Presumably a connection timeout: initiating the clean-up directly from here.
- cleanup(e.getMessage());
}
}
@@ -615,9 +625,9 @@ class Connection extends SimpleChannelUpstreamHandler {
* callbacks. The callee is supposed to handle the error and retry sending the messages,
* if needed.
*
- * @param errorMessage a string to describe the cause of the cleanup
+ * @param error the exception which caused the connection cleanup
*/
- private void cleanup(final String errorMessage) {
+ private void cleanup(KuduException error) {
List<QueuedMessage> queued;
Map<Integer, Callback<Void, CallResponseInfo>> inflight;
@@ -636,7 +646,8 @@ class Connection extends SimpleChannelUpstreamHandler {
needNewAuthnToken = negotiationFailure.status.getCode().equals(
RpcHeader.ErrorStatusPB.RpcErrorCodePB.FATAL_INVALID_AUTHENTICATION_TOKEN);
}
- LOG.debug("{} cleaning up while in state {} due to: {}", getLogPrefix(), state, errorMessage);
+ LOG.debug("{} cleaning up while in state {} due to: {}",
+ getLogPrefix(), state, error.getMessage());
queued = queuedMessages;
queuedMessages = null;
@@ -648,14 +659,13 @@ class Connection extends SimpleChannelUpstreamHandler {
} finally {
lock.unlock();
}
- final Status error = Status.NetworkError(getLogPrefix() + " " +
- (errorMessage == null ? "connection reset" : errorMessage));
- final RecoverableException exception =
- needNewAuthnToken ? new InvalidAuthnTokenException(error) : new RecoverableException(error);
+ if (needNewAuthnToken) {
+ error = new InvalidAuthnTokenException(error.getStatus());
+ }
for (Callback<Void, CallResponseInfo> cb : inflight.values()) {
try {
- cb.call(new CallResponseInfo(null, exception));
+ cb.call(new CallResponseInfo(null, error));
} catch (Exception e) {
LOG.warn("{} exception while aborting in-flight call: {}", getLogPrefix(), e);
}
@@ -664,7 +674,7 @@ class Connection extends SimpleChannelUpstreamHandler {
if (queued != null) {
for (QueuedMessage qm : queued) {
try {
- qm.cb.call(new CallResponseInfo(null, exception));
+ qm.cb.call(new CallResponseInfo(null, error));
} catch (Exception e) {
LOG.warn("{} exception while aborting enqueued call: {}", getLogPrefix(), e);
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/fcbfcc74/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 4736b37..ad62b9f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -31,6 +31,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.security.cert.Certificate;
import java.util.List;
@@ -52,7 +53,6 @@ import javax.security.sasl.SaslException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -60,6 +60,7 @@ import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import org.apache.yetus.audience.InterfaceAudience;
+import org.ietf.jgss.GSSException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@@ -106,7 +107,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
static final int CONNECTION_CTX_CALL_ID = -3;
static final int SASL_CALL_ID = -33;
- private static enum State {
+ private enum State {
INITIAL,
AWAIT_NEGOTIATE,
AWAIT_TLS_HANDSHAKE,
@@ -221,8 +222,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
- throws Exception {
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws IOException {
Object m = evt.getMessage();
if (!(m instanceof CallResponse)) {
ctx.sendUpstream(evt);
@@ -231,8 +231,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
handleResponse(ctx.getChannel(), (CallResponse)m);
}
- private void handleResponse(Channel chan, CallResponse callResponse)
- throws IOException {
+ private void handleResponse(Channel chan, CallResponse callResponse) throws IOException {
final RpcHeader.ResponseHeader header = callResponse.getHeader();
if (header.getIsError()) {
final RpcHeader.ErrorStatusPB.Builder errBuilder = RpcHeader.ErrorStatusPB.newBuilder();
@@ -270,8 +269,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
}
}
- private void handleSaslMessage(Channel chan, NegotiatePB response)
- throws IOException {
+ private void handleSaslMessage(Channel chan, NegotiatePB response) throws IOException {
switch (response.getStep()) {
case SASL_CHALLENGE:
handleChallengeResponse(chan, response);
@@ -298,8 +296,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
return saslBuilder.build();
}
- private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws
- SaslException, SSLException {
+ private void handleNegotiateResponse(Channel chan,
+ RpcHeader.NegotiatePB response) throws IOException {
Preconditions.checkState(response.getStep() == NegotiateStep.NEGOTIATE,
"Expected NEGOTIATE message, got {}", response.getStep());
@@ -343,7 +341,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
}
}
- private void chooseAndInitializeSaslMech(NegotiatePB response) throws SaslException {
+ private void chooseAndInitializeSaslMech(NegotiatePB response) throws NonRecoverableException {
// Gather the set of server-supported mechanisms.
Set<String> serverMechs = Sets.newHashSet();
for (RpcHeader.NegotiatePB.SaslMechanism mech : response.getSaslMechanismsList()) {
@@ -367,6 +365,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
if ("GSSAPI".equals(clientMech)) {
props.put(Sasl.QOP, "auth-int");
}
+
try {
saslClient = Sasl.createSaslClient(new String[]{ clientMech },
null,
@@ -378,14 +377,26 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
break;
} catch (SaslException e) {
errorsByMech.put(clientMech, e.getMessage());
- saslClient = null;
}
}
- if (chosenMech == null) {
- throw new SaslException("unable to negotiate a matching mechanism. Errors: [" +
- Joiner.on(",").withKeyValueSeparator(": ").join(errorsByMech) +
- "]");
+
+ if (chosenMech != null) {
+ LOG.debug("SASL mechanism {} chosen for peer {}", chosenMech, remoteHostname);
+ return;
}
+
+ // TODO(KUDU-1948): when the Java client has an option to require security, detect the case
+ // where the server is configured without Kerberos and the client requires it.
+ String message;
+ if (serverMechs.size() == 1 && serverMechs.contains("GSSAPI")) {
+ // Give a better error diagnostic for common case of an unauthenticated client connecting
+ // to a secure server.
+ message = "Server requires Kerberos, but this client is not authenticated (kinit)";
+ } else {
+ message = "Unable to negotiate a matching mechanism. Errors: [" +
+ Joiner.on(", ").withKeyValueSeparator(": ").join(errorsByMech) + "]";
+ }
+ throw new NonRecoverableException(Status.ConfigurationError(message));
}
private AuthenticationTypePB.TypeCase chooseAuthenticationType(NegotiatePB response) {
@@ -454,8 +465,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
* Handle an inbound message during the TLS handshake. If this message
* causes the handshake to complete, triggers the beginning of SASL initiation.
*/
- private void handleTlsMessage(Channel chan, NegotiatePB response)
- throws IOException {
+ private void handleTlsMessage(Channel chan, NegotiatePB response) throws IOException {
Preconditions.checkState(response.getStep() == NegotiateStep.TLS_HANDSHAKE);
Preconditions.checkArgument(!response.getTlsHandshake().isEmpty(),
"empty TLS message from server");
@@ -529,7 +539,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
.build());
}
- private void startAuthentication(Channel chan) throws SaslException {
+ private void startAuthentication(Channel chan) throws SaslException, NonRecoverableException {
switch (chosenAuthnType) {
case SASL:
sendSaslInitiate(chan);
@@ -556,8 +566,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
sendSaslMessage(chan, builder.build());
}
- private void handleTokenExchangeResponse(Channel chan,
- NegotiatePB response) throws SaslException {
+ private void handleTokenExchangeResponse(Channel chan, NegotiatePB response)
+ throws SaslException {
Preconditions.checkArgument(response.getStep() == NegotiateStep.TOKEN_EXCHANGE,
"expected TOKEN_EXCHANGE, got step: {}", response.getStep());
@@ -565,7 +575,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
finish(chan);
}
- private void sendSaslInitiate(Channel chan) throws SaslException {
+ private void sendSaslInitiate(Channel chan) throws SaslException, NonRecoverableException {
RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
if (saslClient.hasInitialResponse()) {
byte[] initialResponse = evaluateChallenge(new byte[0]);
@@ -577,8 +587,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
sendSaslMessage(chan, builder.build());
}
- private void handleChallengeResponse(Channel chan, RpcHeader.NegotiatePB response) throws
- SaslException {
+ private void handleChallengeResponse(Channel chan, RpcHeader.NegotiatePB response)
+ throws SaslException, NonRecoverableException {
byte[] saslToken = evaluateChallenge(response.getToken().toByteArray());
if (saslToken == null) {
throw new IllegalStateException("Not expecting an empty token");
@@ -592,7 +602,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
/**
* Verify the channel bindings included in 'response'. This is used only
* for GSSAPI-authenticated connections over TLS.
- * @throws RuntimeException on failure to verify
+ * @throws SSLPeerUnverifiedException on failure to verify
*/
private void verifyChannelBindings(NegotiatePB response) throws IOException {
byte[] expected = SecurityUtil.getEndpointChannelBindings(peerCert);
@@ -671,18 +681,33 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
return new RpcOutboundMessage(header, pb);
}
- private byte[] evaluateChallenge(final byte[] challenge) throws SaslException {
+ private byte[] evaluateChallenge(final byte[] challenge)
+ throws SaslException, NonRecoverableException {
try {
return Subject.doAs(securityContext.getSubject(),
new PrivilegedExceptionAction<byte[]>() {
- @Override
- public byte[] run() throws Exception {
- return saslClient.evaluateChallenge(challenge);
- }
- });
- } catch (Exception e) {
- Throwables.throwIfInstanceOf(e, SaslException.class);
- throw new RuntimeException(e);
+ @Override
+ public byte[] run() throws SaslException {
+ return saslClient.evaluateChallenge(challenge);
+ }
+ });
+ } catch (PrivilegedActionException e) {
+ // This cast is safe because the action above only throws checked SaslException.
+ SaslException saslException = (SaslException) e.getCause();
+
+ // TODO(KUDU-2121): We should never get to this point if the client does not have
+ // Kerberos credentials, but it seems that on certain platforms it can happen.
+ // So, we try and determine whether the evaluateChallenge failed due to missing
+ // credentials, and return a nicer error message if so.
+ Throwable cause = saslException.getCause();
+ if (cause instanceof GSSException &&
+ ((GSSException) cause).getMajor() == GSSException.NO_CRED) {
+ throw new NonRecoverableException(
+ Status.ConfigurationError(
+ "Server requires Kerberos, but this client is not authenticated (kinit)"),
+ saslException);
+ }
+ throw saslException;
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/fcbfcc74/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
index 35fc389..50b566a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestSecurity.java
@@ -39,24 +39,22 @@ public class TestSecurity extends BaseKuduTest {
* be able to authenticate to the masters and tablet servers using tokens.
*/
@Test
- public void testImportExportAuthenticationCredentials() throws InterruptedException, Exception {
+ public void testImportExportAuthenticationCredentials() throws Exception {
byte[] authnData = client.exportAuthenticationCredentials().join();
assertNotNull(authnData);
String oldTicketCache = System.getProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
System.clearProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY);
try {
- KuduClient newClient = new KuduClient.KuduClientBuilder(masterAddresses)
- .defaultAdminOperationTimeoutMs(2000)
- .build();
+ KuduClient newClient = new KuduClient.KuduClientBuilder(masterAddresses).build();
// Test that a client with no credentials cannot list servers.
try {
newClient.listTabletServers();
Assert.fail("should not have been able to connect to a secure cluster " +
"with no credentials");
- } catch (Exception e) {
- // Expected.
- // TODO(todd): assert on the particular exception type and error string
+ } catch (NonRecoverableException e) {
+ Assert.assertTrue(e.getMessage().contains(
+ "Server requires Kerberos, but this client is not authenticated"));
}
// If we import the authentication data from the old authenticated client,
@@ -72,5 +70,4 @@ public class TestSecurity extends BaseKuduTest {
System.setProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY, oldTicketCache);
}
}
-
}