You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/02/11 01:19:08 UTC
[1/2] kudu git commit: java: add TLS support
Repository: kudu
Updated Branches:
refs/heads/master 60aedaaf4 -> 41dd9d41a
java: add TLS support
This adds support for negotiating and speaking TLS in the Java client.
Testing is a bit light here, because the code on the C++ side to
generate and sign self-signed certs is still in flight, but once it's
committed, all of our integration tests should cover this code. I also
did write one unit test for the TLS handshake in particular.
I did test this against a local cluster by running kudu-ts against a
tserver and master compiled with the latest in-flight C++ patches to
generate self-signed certs. I verified with tshark that the traffic is
encrypted whereas I could read it before.
Change-Id: I9b095ccac908f391e683ec4ef433bae52d659a21
Reviewed-on: http://gerrit.cloudera.org:8080/5949
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5833debe
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5833debe
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5833debe
Branch: refs/heads/master
Commit: 5833debe325a1db2a05ce9f5b2084fda7bba9449
Parents: 60aedaa
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Feb 8 11:12:18 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Sat Feb 11 00:51:51 2017 +0000
----------------------------------------------------------------------
.../java/org/apache/kudu/client/Negotiator.java | 211 +++++++++++++++++--
.../java/org/apache/kudu/util/SecurityUtil.java | 46 ++++
.../org/apache/kudu/client/TestNegotiator.java | 113 +++++++++-
3 files changed, 348 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/5833debe/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 11cd8db..3b0385b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -27,9 +27,12 @@
package org.apache.kudu.client;
import java.security.PrivilegedExceptionAction;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
@@ -45,21 +48,32 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.ObjectArrays;
import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
import com.google.protobuf.ZeroCopyLiteralByteString;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
+import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kudu.annotations.InterfaceAudience;
import org.apache.kudu.rpc.RpcHeader;
+import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
+import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+import org.apache.kudu.util.SecurityUtil;
/**
* Netty Pipeline handler which runs connection negotiation with
@@ -69,10 +83,13 @@ import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
@InterfaceAudience.Private
public class Negotiator extends SimpleChannelUpstreamHandler {
- private static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
+ static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
- private static final Map<String, String> SASL_PROPS = ImmutableMap.of();
private static final SaslClientCallbackHandler SASL_CALLBACK = new SaslClientCallbackHandler();
+ private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
+ ImmutableSet.of(
+ RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS,
+ RpcHeader.RpcFeatureFlag.TLS);
/**
* List of SASL mechanisms supported by the client, in descending priority order.
@@ -83,17 +100,46 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
static final String USER_AND_PASSWORD = "java_client";
- private boolean finished;
- private SaslClient saslClient;
static final int CONNECTION_CTX_CALL_ID = -3;
static final int SASL_CALL_ID = -33;
- private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
- ImmutableSet.of(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS);
- private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
+ private static enum State {
+ INITIAL,
+ AWAIT_NEGOTIATE,
+ AWAIT_TLS_HANDSHAKE,
+ AWAIT_SASL,
+ FINISHED
+ }
+
+ /** Subject to authenticate as, when using Kerberos/GSSAPI */
private final Subject subject;
+ /** The remote hostname we're connecting to, used by TLS and GSSAPI */
private final String remoteHostname;
+ private State state = State.INITIAL;
+ private SaslClient saslClient;
+
+ /** The negotiated mechanism, set after NEGOTIATE stage. */
+ private String chosenMech;
+
+ /** The features supported by the server, set after NEGOTIATE stage. */
+ private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
+
+ /**
+ * The negotiation protocol relies on tunneling the TLS handshake through
+ * protobufs. The embedder holds a Netty SslHandler which can perform the
+ * handshake. Once the handshake is complete, we will stop using the embedder
+ * and add the handler directly to the real ChannelPipeline.
+ * Only non-null once TLS is initiated.
+ */
+ private DecoderEmbedder<ChannelBuffer> sslEmbedder;
+
+ /**
+ * Future indicating whether the embedded handshake has completed.
+ * Only non-null once TLS is initiated.
+ */
+ private ChannelFuture sslHandshakeFuture;
+
public Negotiator(Subject subject, String remoteHostname) {
this.subject = subject;
this.remoteHostname = remoteHostname;
@@ -112,6 +158,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
}
builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.NEGOTIATE);
+ state = State.AWAIT_NEGOTIATE;
sendSaslMessage(channel, builder.build());
}
@@ -134,14 +181,32 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
handleResponse(ctx.getChannel(), (CallResponse)m);
}
- private void handleResponse(Channel chan, CallResponse callResponse) throws SaslException {
+ private void handleResponse(Channel chan, CallResponse callResponse)
+ throws SaslException, SSLException {
// TODO(todd): this needs to handle error responses, not just success responses.
- Preconditions.checkState(!finished, "received a response after negotiation was complete");
RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
- switch (response.getStep()) {
- case NEGOTIATE:
+
+ // TODO: check that the message type matches the expected one in all
+ // of the below implementations.
+ switch (state) {
+ case AWAIT_NEGOTIATE:
handleNegotiateResponse(chan, response);
break;
+ case AWAIT_SASL:
+ handleSaslMessage(chan, response);
+ break;
+ case AWAIT_TLS_HANDSHAKE:
+ handleTlsMessage(chan, response);
+ break;
+ default:
+ throw new IllegalStateException("received a message in unexpected state: " +
+ state.toString());
+ }
+ }
+
+ private void handleSaslMessage(Channel chan, NegotiatePB response)
+ throws SaslException {
+ switch (response.getStep()) {
case SASL_CHALLENGE:
handleChallengeResponse(chan, response);
break;
@@ -149,7 +214,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
handleSuccessResponse(chan);
break;
default:
- LOG.error(String.format("Wrong negotiation step: %s", response.getStep()));
+ throw new IllegalStateException("Wrong negotiation step: " +
+ response.getStep());
}
}
@@ -167,7 +233,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
}
private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws
- SaslException {
+ SaslException, SSLException {
// Store the supported features advertised by the server.
ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder();
for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) {
@@ -177,6 +243,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
}
serverFeatures = features.build();
+ boolean willUseTls = serverFeatures.contains(RpcFeatureFlag.TLS);
+
// Gather the set of server-supported mechanisms.
Set<String> serverMechs = Sets.newHashSet();
for (RpcHeader.NegotiatePB.SaslMechanism mech : response.getSaslMechanismsList()) {
@@ -187,24 +255,27 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
// the server also supports them. If so, try to initialize saslClient.
// If we find a common mechanism that also can be successfully initialized,
// choose that mech.
- byte[] initialResponse = null;
- String chosenMech = null;
Map<String, String> errorsByMech = Maps.newHashMap();
for (String clientMech : PRIORITIZED_MECHS) {
if (!serverMechs.contains(clientMech)) {
errorsByMech.put(clientMech, "not advertised by server");
continue;
}
+ Map<String, String> props = Maps.newHashMap();
+ // If we are using GSSAPI with TLS, enable integrity protection, which we use
+ // to securely transmit the channel bindings. Channel bindings aren't
+ // yet implemented, but if we don't advertise 'auth-int', then the server
+ // won't talk to us.
+ if ("GSSAPI".equals(clientMech) && willUseTls) {
+ props.put(Sasl.QOP, "auth-int");
+ }
try {
saslClient = Sasl.createSaslClient(new String[]{ clientMech },
null,
"kudu",
remoteHostname,
- SASL_PROPS,
+ props,
SASL_CALLBACK);
- if (saslClient.hasInitialResponse()) {
- initialResponse = evaluateChallenge(new byte[0]);
- }
chosenMech = clientMech;
break;
} catch (SaslException e) {
@@ -219,12 +290,110 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
"]");
}
+ // If we negotiated TLS, then we want to start the TLS handshake.
+ if (willUseTls) {
+ startTlsHandshake(chan);
+ } else {
+ sendSaslInitiate(chan);
+ }
+ }
+
+ /**
+ * Send the initial TLS "ClientHello" message.
+ */
+ private void startTlsHandshake(Channel chan) throws SSLException {
+ SSLEngine engine = SecurityUtil.createSslEngine();
+ // TODO(todd): remove usage of this anonymous cipher suite.
+ // It's replaced in the next patch in this patch series by
+ // a self-signed cert used for tests.
+ engine.setEnabledCipherSuites(ObjectArrays.concat(
+ engine.getEnabledCipherSuites(),
+ "TLS_DH_anon_WITH_AES_128_CBC_SHA"));
+ engine.setUseClientMode(true);
+ SslHandler handler = new SslHandler(engine);
+ handler.setEnableRenegotiation(false);
+ sslEmbedder = new DecoderEmbedder<>(handler);
+ sslHandshakeFuture = handler.handshake();
+ state = State.AWAIT_TLS_HANDSHAKE;
+ boolean sent = sendPendingOutboundTls(chan);
+ assert sent;
+ }
+
+ /**
+ * Handle an inbound message during the TLS handshake. If this message
+ * causes the handshake to complete, triggers the beginning of SASL initiation.
+ */
+ private void handleTlsMessage(Channel chan, NegotiatePB response)
+ throws SaslException {
+ Preconditions.checkState(response.getStep() == NegotiateStep.TLS_HANDSHAKE);
+ Preconditions.checkArgument(!response.getTlsHandshake().isEmpty(),
+ "empty TLS message from server");
+
+ // Pass the TLS message into our embedded SslHandler.
+ sslEmbedder.offer(ChannelBuffers.copiedBuffer(
+ response.getTlsHandshake().asReadOnlyByteBuffer()));
+ if (sendPendingOutboundTls(chan)) {
+ // Data was sent -- we must continue the handshake process.
+ return;
+ }
+ // The handshake completed.
+ // Insert the SSL handler into the pipeline so that all following traffic
+ // gets encrypted, and then move on to the SASL portion of negotiation.
+ //
+ // NOTE: this takes effect immediately (i.e. the following SASL initiation
+ // sequence is encrypted).
+ chan.getPipeline().addFirst("tls", sslEmbedder.getPipeline().getFirst());
+ sendSaslInitiate(chan);
+ }
+
+ /**
+ * If the embedded SslHandler has data to send outbound, gather
+ * it all, send it tunneled in a NegotiatePB message, and return true.
+ *
+ * Otherwise, indicates that the handshake is complete by returning false.
+ */
+ private boolean sendPendingOutboundTls(Channel chan) {
+ // The SslHandler can generate multiple TLS messages in response
+ // (e.g. ClientKeyExchange, ChangeCipherSpec, ClientFinished).
+ // We poll the handler until it stops giving us buffers.
+ List<ByteString> bufs = Lists.newArrayList();
+ while (sslEmbedder.peek() != null) {
+ bufs.add(ByteString.copyFrom(sslEmbedder.poll().toByteBuffer()));
+ }
+ ByteString data = ByteString.copyFrom(bufs);
+ if (sslHandshakeFuture.isDone()) {
+ // TODO(danburkert): is this a correct assumption? would the
+ // client ever be "done" but also produce handshake data?
+ // if it did, would we want to encrypt the SSL message or no?
+ assert data.size() == 0;
+ return false;
+ } else {
+ assert data.size() > 0;
+ sendTunneledTls(chan, data);
+ return true;
+ }
+ }
+
+ /**
+ * Send a buffer of data for the TLS handshake, encapsulated in the
+ * appropriate TLS_HANDSHAKE negotiation message.
+ */
+ private void sendTunneledTls(Channel chan, ByteString buf) {
+ sendSaslMessage(chan, NegotiatePB.newBuilder()
+ .setStep(NegotiateStep.TLS_HANDSHAKE)
+ .setTlsHandshake(buf)
+ .build());
+ }
+
+ private void sendSaslInitiate(Channel chan) throws SaslException {
RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
- if (initialResponse != null) {
+ if (saslClient.hasInitialResponse()) {
+ byte[] initialResponse = evaluateChallenge(new byte[0]);
builder.setToken(ZeroCopyLiteralByteString.wrap(initialResponse));
}
builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_INITIATE);
builder.addSaslMechanismsBuilder().setMechanism(chosenMech);
+ state = State.AWAIT_SASL;
sendSaslMessage(chan, builder.build());
}
@@ -241,7 +410,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
}
private void handleSuccessResponse(Channel chan) {
- finished = true;
+ state = State.FINISHED;
chan.getPipeline().remove(this);
Channels.write(chan, makeConnectionContext());
http://git-wip-us.apache.org/repos/asf/kudu/blob/5833debe/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
index 68d9b00..163cf96 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
@@ -19,9 +19,16 @@ package org.apache.kudu.util;
import java.security.AccessControlContext;
import java.security.AccessController;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.AppConfigurationEntry;
@@ -30,6 +37,8 @@ import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,4 +109,41 @@ public abstract class SecurityUtil {
}
}
+ /**
+ * TrustManager implementation which will trust any certificate.
+ * TODO(PKI): this needs to change so that it can be configured with
+ * the cluster's CA cert.
+ */
+ static class TrustAnyCert implements X509TrustManager {
+ @Override
+ public void checkClientTrusted(X509Certificate[] arg0, String arg1)
+ throws CertificateException {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] arg0, String arg1)
+ throws CertificateException {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ }
+
+ /**
+ * Create an SSL engine configured to trust any certificate.
+ * @return
+ * @throws SSLException
+ */
+ public static SSLEngine createSslEngine() throws SSLException {
+ try {
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(null, new TrustManager[] { new TrustAnyCert() }, null);
+ return ctx.createSSLEngine();
+ } catch (Exception e) {
+ Throwables.propagateIfInstanceOf(e, SSLException.class);
+ throw Throwables.propagate(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/5833debe/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index 1fc6ed9..616937e 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -19,22 +19,33 @@ package org.apache.kudu.client;
import static org.junit.Assert.*;
+import java.nio.ByteBuffer;
import java.security.AccessControlContext;
import java.security.AccessController;
+import java.util.List;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLException;
import javax.security.auth.Subject;
import org.apache.kudu.client.Negotiator.Result;
import org.apache.kudu.rpc.RpcHeader.ConnectionContextPB;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
+import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB.SaslMechanism;
+import org.apache.kudu.util.SecurityUtil;
import org.apache.kudu.rpc.RpcHeader.ResponseHeader;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
+import org.jboss.netty.handler.ssl.SslHandler;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
public class TestNegotiator {
@@ -66,7 +77,7 @@ public class TestNegotiator {
RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
NegotiatePB body = (NegotiatePB) msg.getBody();
assertEquals(Negotiator.SASL_CALL_ID, msg.getHeader().getCallId());
- assertEquals(NegotiateStep.NEGOTIATE, ((NegotiatePB)msg.getBody()).getStep());
+ assertEquals(NegotiateStep.NEGOTIATE, body.getStep());
// Respond with NEGOTIATE.
embedder.offer(fakeResponse(
@@ -104,4 +115,104 @@ public class TestNegotiator {
assertNotNull(result);
}
+ private static void runTasks(SSLEngineResult result,
+ SSLEngine engine) {
+ if (result.getHandshakeStatus() != HandshakeStatus.NEED_TASK) {
+ return;
+ }
+ Runnable task;
+ while ((task = engine.getDelegatedTask()) != null) {
+ task.run();
+ }
+ }
+
+ private static CallResponse runServerStep(SSLEngine engine,
+ ByteString clientTlsMessage) throws SSLException {
+ Negotiator.LOG.info("Handling TLS message from client: {}", clientTlsMessage.toByteArray());
+ ByteBuffer dst = ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
+ ByteBuffer src = clientTlsMessage.asReadOnlyByteBuffer();
+ do {
+ SSLEngineResult result = engine.unwrap(src, dst);
+ runTasks(result, engine);
+ } while (engine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+
+ if (engine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_WRAP) {
+ // The server has more to send.
+ // Produce the ServerHello and send it back to the client.
+ List<ByteString> bufs = Lists.newArrayList();
+ while (engine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_WRAP) {
+ dst.clear();
+ runTasks(engine.wrap(ByteBuffer.allocate(0), dst), engine);
+ dst.flip();
+ bufs.add(ByteString.copyFrom(dst));
+ }
+ return fakeResponse(
+ ResponseHeader.newBuilder().setCallId(-33).build(),
+ NegotiatePB.newBuilder()
+ .setTlsHandshake(ByteString.copyFrom(bufs))
+ .setStep(NegotiateStep.TLS_HANDSHAKE)
+ .build());
+ } else if (engine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+ // Handshake complete.
+ return null;
+ } else {
+ throw new AssertionError("unexpected state: " + engine.getHandshakeStatus());
+ }
+ }
+
+ @Test
+ public void testTlsNegotiation() throws Exception {
+ SSLEngine serverEngine = SecurityUtil.createSslEngine();
+ serverEngine.setUseClientMode(false);
+ // Enable only an anonymous cipher suite, so we don't need to deal with
+ // setting up certs in this test, for now.
+ serverEngine.setEnabledCipherSuites(new String[]{
+ "TLS_DH_anon_WITH_AES_128_CBC_SHA"
+ });
+
+ negotiator.sendHello(embedder.getPipeline().getChannel());
+
+ // Expect client->server: NEGOTIATE, TLS included.
+ RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+ NegotiatePB body = (NegotiatePB) msg.getBody();
+ assertEquals(NegotiateStep.NEGOTIATE, body.getStep());
+ assertTrue(body.getSupportedFeaturesList().contains(RpcFeatureFlag.TLS));
+
+ // Fake a server response with TLS enabled.
+ embedder.offer(fakeResponse(
+ ResponseHeader.newBuilder().setCallId(-33).build(),
+ NegotiatePB.newBuilder()
+ .addSaslMechanisms(NegotiatePB.SaslMechanism.newBuilder().setMechanism("PLAIN"))
+ .addSupportedFeatures(RpcFeatureFlag.TLS)
+ .setStep(NegotiateStep.NEGOTIATE)
+ .build()));
+
+ // Expect client->server: TLS_HANDSHAKE.
+ msg = (RpcOutboundMessage) embedder.poll();
+ body = (NegotiatePB) msg.getBody();
+ assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
+
+ // Consume the ClientHello in our fake server, which should generate ServerHello.
+ embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
+
+ // Expect client to generate ClientKeyExchange, ChangeCipherSpec, Finished.
+ msg = (RpcOutboundMessage) embedder.poll();
+ body = (NegotiatePB) msg.getBody();
+ assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
+
+ // Server consumes the above. Should send the TLS "Finished" message.
+ embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
+
+ // The pipeline should now have an SSL handler as the first handler.
+ assertTrue(embedder.getPipeline().getFirst() instanceof SslHandler);
+
+ // The Negotiator should have sent the SASL_INITIATE at this point.
+ // NOTE: in a non-mock environment, this message would now be encrypted
+ // by the newly-added TLS handler. But, with the DecoderEmbedder that we're
+ // using, we don't actually end up processing outbound events. Upgrading
+ // to Netty 4 and using EmbeddedChannel instead would make this more realistic.
+ msg = (RpcOutboundMessage) embedder.poll();
+ body = (NegotiatePB) msg.getBody();
+ assertEquals(NegotiateStep.SASL_INITIATE, body.getStep());
+ }
}
[2/2] kudu git commit: java: implement Channel Bindings
Posted by da...@apache.org.
java: implement Channel Bindings
This adds a utility function for calculating RFC 5929
"tls-server-endpoint" channel bindings in Java. It also hooks the
channel bindings verification in the Negotiator implementation.
A new simple unit test verifies that channel bindings can be calculated
from a cert.
I also added a Java-format KeyStore with a self-signed cert as a test
resource. This was useful for writing a unit test. I generated it based
on the certs found in the C++ source using the following:
openssl pkcs12 -export -in /tmp/cert.pem -inkey /tmp/key.pem > /tmp/p12
keytool -importkeystore -srckeystore /tmp/p12 -destkeystore java/kudu-client/src/test/resources/test-key-and-cert.jks
No new unit test actually verifies the channel bindings verification,
but I did check that the integrity check is working by temporarily
flipping a byte before unwrapping and making sure that verification
failed.
Change-Id: I8b604ea6a0cff55820f7fbbb3ba4beba3a888a48
Reviewed-on: http://gerrit.cloudera.org:8080/5953
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/41dd9d41
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/41dd9d41
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/41dd9d41
Branch: refs/heads/master
Commit: 41dd9d41a1e6a8a0b60104e26ed1f0ece04158e6
Parents: 5833deb
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Feb 8 17:35:48 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Sat Feb 11 00:51:58 2017 +0000
----------------------------------------------------------------------
.../java/org/apache/kudu/client/Negotiator.java | 64 +++++++++++++++----
.../java/org/apache/kudu/util/SecurityUtil.java | 57 +++++++++++++++++
.../org/apache/kudu/client/TestNegotiator.java | 44 ++++++++++---
.../src/test/resources/test-key-and-cert.jks | Bin 0 -> 2212 bytes
4 files changed, 143 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/41dd9d41/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 3b0385b..eba71a3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -27,12 +27,14 @@
package org.apache.kudu.client;
import java.security.PrivilegedExceptionAction;
+import java.security.cert.Certificate;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
@@ -46,11 +48,9 @@ import javax.security.sasl.SaslException;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.ObjectArrays;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ZeroCopyLiteralByteString;
@@ -140,6 +140,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
*/
private ChannelFuture sslHandshakeFuture;
+ private Certificate peerCert;
+
public Negotiator(Subject subject, String remoteHostname) {
this.subject = subject;
this.remoteHostname = remoteHostname;
@@ -211,7 +213,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
handleChallengeResponse(chan, response);
break;
case SASL_SUCCESS:
- handleSuccessResponse(chan);
+ handleSuccessResponse(chan, response);
break;
default:
throw new IllegalStateException("Wrong negotiation step: " +
@@ -263,9 +265,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
}
Map<String, String> props = Maps.newHashMap();
// If we are using GSSAPI with TLS, enable integrity protection, which we use
- // to securely transmit the channel bindings. Channel bindings aren't
- // yet implemented, but if we don't advertise 'auth-int', then the server
- // won't talk to us.
+ // to securely transmit the channel bindings.
if ("GSSAPI".equals(clientMech) && willUseTls) {
props.put(Sasl.QOP, "auth-int");
}
@@ -303,12 +303,6 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
*/
private void startTlsHandshake(Channel chan) throws SSLException {
SSLEngine engine = SecurityUtil.createSslEngine();
- // TODO(todd): remove usage of this anonymous cipher suite.
- // It's replaced in the next patch in this patch series by
- // a self-signed cert used for tests.
- engine.setEnabledCipherSuites(ObjectArrays.concat(
- engine.getEnabledCipherSuites(),
- "TLS_DH_anon_WITH_AES_128_CBC_SHA"));
engine.setUseClientMode(true);
SslHandler handler = new SslHandler(engine);
handler.setEnableRenegotiation(false);
@@ -342,7 +336,17 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
//
// NOTE: this takes effect immediately (i.e. the following SASL initiation
// sequence is encrypted).
- chan.getPipeline().addFirst("tls", sslEmbedder.getPipeline().getFirst());
+ SslHandler handler = (SslHandler)sslEmbedder.getPipeline().getFirst();
+ try {
+ Certificate[] certs = handler.getEngine().getSession().getPeerCertificates();
+ if (certs.length == 0) {
+ throw new SSLPeerUnverifiedException("no peer cert found");
+ }
+ } catch (SSLPeerUnverifiedException e) {
+ throw Throwables.propagate(e);
+ }
+
+ chan.getPipeline().addFirst("tls", handler);
sendSaslInitiate(chan);
}
@@ -362,6 +366,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
}
ByteString data = ByteString.copyFrom(bufs);
if (sslHandshakeFuture.isDone()) {
+ // TODO(todd): should check sslHandshakeFuture.isSuccess()
// TODO(danburkert): is this a correct assumption? would the
// client ever be "done" but also produce handshake data?
// if it did, would we want to encrypt the SSL message or no?
@@ -409,7 +414,38 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
sendSaslMessage(chan, builder.build());
}
- private void handleSuccessResponse(Channel chan) {
+ /**
+ * Verify the channel bindings included in 'response'. This is used only
+ * for GSSAPI-authenticated connections over TLS.
+ * @throws RuntimeException on failure to verify
+ */
+ private void verifyChannelBindings(NegotiatePB response) {
+ try {
+ byte[] expected = SecurityUtil.getEndpointChannelBindings(peerCert);
+ if (!response.hasChannelBindings()) {
+ throw new RuntimeException("no channel bindings provided by remote peer");
+ }
+ byte[] provided = response.getChannelBindings().toByteArray();
+ // NOTE: the C SASL library's implementation of sasl_encode() actually
+ // includes a length prefix. Java's equivalents do not. So, we have to
+ // chop off the length prefix here before unwrapping.
+ if (provided.length < 4) {
+ throw new RuntimeException("invalid too-short channel bindings");
+ }
+ byte[] unwrapped = saslClient.unwrap(provided, 4, provided.length - 4);
+ if (!Bytes.equals(expected, unwrapped)) {
+ throw new RuntimeException("invalid channel bindings provided by remote peer");
+ }
+ } catch (SaslException se) {
+ throw Throwables.propagate(se);
+ }
+ }
+
+ private void handleSuccessResponse(Channel chan, NegotiatePB response) {
+ if (peerCert != null && "GSSAPI".equals(chosenMech)) {
+ verifyChannelBindings(response);
+ }
+
state = State.FINISHED;
chan.getPipeline().remove(this);
http://git-wip-us.apache.org/repos/asf/kudu/blob/41dd9d41/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
index 163cf96..37e4302 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
@@ -19,6 +19,8 @@ package org.apache.kudu.util;
import java.security.AccessControlContext;
import java.security.AccessController;
+import java.security.MessageDigest;
+import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.HashMap;
@@ -37,7 +39,9 @@ import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +54,26 @@ public abstract class SecurityUtil {
public static final String KUDU_TICKETCACHE_PROPERTY = "kudu.krb5ccname";
/**
+ * Map from the names of digest algorithms used in X509 certificates to
+ * the appropriate MessageDigest implementation to use for channel-bindings.
+ */
+ private static final Map<String, String> CERT_DIGEST_TO_MESSAGE_DIGEST =
+ ImmutableMap.<String, String>builder()
+ // RFC 5929: if the certificate's signatureAlgorithm uses a single hash
+ // function, and that hash function is either MD5 [RFC1321] or SHA-1
+ // [RFC3174], then use SHA-256 [FIPS-180-3];
+ .put("MD5", "SHA-256")
+ .put("SHA1", "SHA-256")
+ // For other algorithms, use the provided hash function.
+ .put("SHA224", "SHA-224")
+ .put("SHA256", "SHA-256")
+ .put("SHA384", "SHA-384")
+ .put("SHA512", "SHA-512")
+ // The above list is exhaustive as of JDK8's implementation of
+ // SignatureAndHashAlgorithm.
+ .build();
+
+ /**
* Return the Subject associated with the current thread's AccessController,
* if that subject has Kerberos credentials. If there is no such subject, or
* the subject has no Kerberos credentials, logins in a new subject from the
@@ -110,6 +134,39 @@ public abstract class SecurityUtil {
}
/**
+ * Compute the "tls-server-endpoint" channel binding data for the given X509
+ * certificate. The algorithm is specified in RFC 5929.
+ *
+ * @return the expected channel bindings for the certificate
+ * @throws RuntimeException if the certificate is not an X509 cert, or if
+ * it uses a signature type for which we cannot compute channel bindings
+ */
+ public static byte[] getEndpointChannelBindings(Certificate cert) {
+ Preconditions.checkArgument(cert instanceof X509Certificate,
+ "can only handle X509 certs");
+ X509Certificate x509 = (X509Certificate)cert;
+ String sigAlg = x509.getSigAlgName();
+
+ // The signature algorithm name is a string like 'SHA256withRSA'.
+ // There's no API available to actually find just the digest algorithm,
+ // so we resort to some hackery.
+ String[] components = sigAlg.split("with");
+ String digestAlg = CERT_DIGEST_TO_MESSAGE_DIGEST.get(components[0].toUpperCase());
+ if (digestAlg == null) {
+ // RFC 5929: if the certificate's signatureAlgorithm uses no hash functions or
+ // uses multiple hash functions, then this channel binding type's channel
+ // bindings are undefined at this time (updates to is channel binding type may
+ // occur to address this issue if it ever arises).
+ throw new RuntimeException("cert uses unknown signature algorithm: " + sigAlg);
+ }
+ try {
+ return MessageDigest.getInstance(digestAlg).digest(cert.getEncoded());
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /**
* TrustManager implementation which will trust any certificate.
* TODO(PKI): this needs to change so that it can be configured with
* the cluster's CA cert.
http://git-wip-us.apache.org/repos/asf/kudu/blob/41dd9d41/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index 616937e..880ab5b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -22,8 +22,12 @@ import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import java.security.AccessControlContext;
import java.security.AccessController;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
import java.util.List;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
@@ -34,9 +38,9 @@ import org.apache.kudu.client.Negotiator.Result;
import org.apache.kudu.rpc.RpcHeader.ConnectionContextPB;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+import org.apache.kudu.util.SecurityUtil;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
import org.apache.kudu.rpc.RpcHeader.NegotiatePB.SaslMechanism;
-import org.apache.kudu.util.SecurityUtil;
import org.apache.kudu.rpc.RpcHeader.ResponseHeader;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
@@ -44,6 +48,7 @@ import org.jboss.netty.handler.ssl.SslHandler;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
@@ -52,6 +57,8 @@ public class TestNegotiator {
private Negotiator negotiator;
private DecoderEmbedder<Object> embedder;
+ private static final char[] password = "password".toCharArray();
+
@Before
public void setup() {
AccessControlContext context = AccessController.getContext();
@@ -66,6 +73,33 @@ public class TestNegotiator {
return new CallResponse(buf);
}
+ KeyStore loadTestKeystore() throws Exception {
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(TestNegotiator.class.getResourceAsStream("/test-key-and-cert.jks"),
+ password);
+ return ks;
+ }
+
+ SSLEngine createServerEngine() {
+ try {
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+ kmf.init(loadTestKeystore(), password);
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(kmf.getKeyManagers(), null, null);
+ return ctx.createSSLEngine();
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Test
+ public void testChannelBinding() throws Exception {
+ KeyStore ks = loadTestKeystore();
+ Certificate cert = ks.getCertificate("1");
+ byte[] bindings = SecurityUtil.getEndpointChannelBindings(cert);
+ assertEquals(32, bindings.length);
+ }
+
/**
* Simple test case for a PLAIN negotiation.
*/
@@ -162,14 +196,8 @@ public class TestNegotiator {
@Test
public void testTlsNegotiation() throws Exception {
- SSLEngine serverEngine = SecurityUtil.createSslEngine();
+ SSLEngine serverEngine = createServerEngine();
serverEngine.setUseClientMode(false);
- // Enable only an anonymous cipher suite, so we don't need to deal with
- // setting up certs in this test, for now.
- serverEngine.setEnabledCipherSuites(new String[]{
- "TLS_DH_anon_WITH_AES_128_CBC_SHA"
- });
-
negotiator.sendHello(embedder.getPipeline().getChannel());
// Expect client->server: NEGOTIATE, TLS included.
http://git-wip-us.apache.org/repos/asf/kudu/blob/41dd9d41/java/kudu-client/src/test/resources/test-key-and-cert.jks
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/resources/test-key-and-cert.jks b/java/kudu-client/src/test/resources/test-key-and-cert.jks
new file mode 100644
index 0000000..45d885d
Binary files /dev/null and b/java/kudu-client/src/test/resources/test-key-and-cert.jks differ