You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/02/11 01:19:08 UTC

[1/2] kudu git commit: java: add TLS support

Repository: kudu
Updated Branches:
  refs/heads/master 60aedaaf4 -> 41dd9d41a


java: add TLS support

This adds support for negotiating and speaking TLS in the Java client.

Testing is a bit light here, because the code on the C++ side to
generate and sign self-signed certs is still in flight, but once it's
committed, all of our integration tests should cover this code. I also
did write one unit test for the TLS handshake in particular.

I did test this against a local cluster by running kudu-ts against a
tserver and master compiled with the latest in-flight C++ patches to
generate self-signed certs. I verified with tshark that the traffic is
encrypted whereas I could read it before.

Change-Id: I9b095ccac908f391e683ec4ef433bae52d659a21
Reviewed-on: http://gerrit.cloudera.org:8080/5949
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5833debe
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5833debe
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5833debe

Branch: refs/heads/master
Commit: 5833debe325a1db2a05ce9f5b2084fda7bba9449
Parents: 60aedaa
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Feb 8 11:12:18 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Sat Feb 11 00:51:51 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/kudu/client/Negotiator.java | 211 +++++++++++++++++--
 .../java/org/apache/kudu/util/SecurityUtil.java |  46 ++++
 .../org/apache/kudu/client/TestNegotiator.java  | 113 +++++++++-
 3 files changed, 348 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5833debe/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 11cd8db..3b0385b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -27,9 +27,12 @@
 package org.apache.kudu.client;
 
 import java.security.PrivilegedExceptionAction;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -45,21 +48,32 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.ObjectArrays;
 import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.ZeroCopyLiteralByteString;
 
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
+import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.rpc.RpcHeader;
+import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
+import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
 import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+import org.apache.kudu.util.SecurityUtil;
 
 /**
  * Netty Pipeline handler which runs connection negotiation with
@@ -69,10 +83,13 @@ import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
 @InterfaceAudience.Private
 public class Negotiator extends SimpleChannelUpstreamHandler {
 
-  private static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
+  static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
 
-  private static final Map<String, String> SASL_PROPS = ImmutableMap.of();
   private static final SaslClientCallbackHandler SASL_CALLBACK = new SaslClientCallbackHandler();
+  private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
+      ImmutableSet.of(
+          RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS,
+          RpcHeader.RpcFeatureFlag.TLS);
 
   /**
    * List of SASL mechanisms supported by the client, in descending priority order.
@@ -83,17 +100,46 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
 
   static final String USER_AND_PASSWORD = "java_client";
 
-  private boolean finished;
-  private SaslClient saslClient;
   static final int CONNECTION_CTX_CALL_ID = -3;
   static final int SASL_CALL_ID = -33;
-  private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
-      ImmutableSet.of(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS);
-  private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
 
+  private static enum State {
+    INITIAL,
+    AWAIT_NEGOTIATE,
+    AWAIT_TLS_HANDSHAKE,
+    AWAIT_SASL,
+    FINISHED
+  }
+
+  /** Subject to authenticate as, when using Kerberos/GSSAPI */
   private final Subject subject;
+  /** The remote hostname we're connecting to, used by TLS and GSSAPI */
   private final String remoteHostname;
 
+  private State state = State.INITIAL;
+  private SaslClient saslClient;
+
+  /** The negotiated mechanism, set after NEGOTIATE stage. */
+  private String chosenMech;
+
+  /** The features supported by the server, set after NEGOTIATE stage. */
+  private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
+
+  /**
+   * The negotiation protocol relies on tunneling the TLS handshake through
+   * protobufs. The embedder holds a Netty SslHandler which can perform the
+   * handshake. Once the handshake is complete, we will stop using the embedder
+   * and add the handler directly to the real ChannelPipeline.
+   * Only non-null once TLS is initiated.
+   */
+  private DecoderEmbedder<ChannelBuffer> sslEmbedder;
+
+  /**
+   * Future indicating whether the embedded handshake has completed.
+   * Only non-null once TLS is initiated.
+   */
+  private ChannelFuture sslHandshakeFuture;
+
   public Negotiator(Subject subject, String remoteHostname) {
     this.subject = subject;
     this.remoteHostname = remoteHostname;
@@ -112,6 +158,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     }
 
     builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.NEGOTIATE);
+    state = State.AWAIT_NEGOTIATE;
     sendSaslMessage(channel, builder.build());
   }
 
@@ -134,14 +181,32 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     handleResponse(ctx.getChannel(), (CallResponse)m);
   }
 
-  private void handleResponse(Channel chan, CallResponse callResponse) throws SaslException {
+  private void handleResponse(Channel chan, CallResponse callResponse)
+      throws SaslException, SSLException {
     // TODO(todd): this needs to handle error responses, not just success responses.
-    Preconditions.checkState(!finished, "received a response after negotiation was complete");
     RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
-    switch (response.getStep()) {
-      case NEGOTIATE:
+
+    // TODO: check that the message type matches the expected one in all
+    // of the below implementations.
+    switch (state) {
+      case AWAIT_NEGOTIATE:
         handleNegotiateResponse(chan, response);
         break;
+      case AWAIT_SASL:
+        handleSaslMessage(chan, response);
+        break;
+      case AWAIT_TLS_HANDSHAKE:
+        handleTlsMessage(chan, response);
+        break;
+      default:
+        throw new IllegalStateException("received a message in unexpected state: " +
+            state.toString());
+    }
+  }
+
+  private void handleSaslMessage(Channel chan, NegotiatePB response)
+      throws SaslException {
+    switch (response.getStep()) {
       case SASL_CHALLENGE:
         handleChallengeResponse(chan, response);
         break;
@@ -149,7 +214,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
         handleSuccessResponse(chan);
         break;
       default:
-        LOG.error(String.format("Wrong negotiation step: %s", response.getStep()));
+        throw new IllegalStateException("Wrong negotiation step: " +
+            response.getStep());
     }
   }
 
@@ -167,7 +233,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
   }
 
   private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws
-      SaslException {
+      SaslException, SSLException {
     // Store the supported features advertised by the server.
     ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder();
     for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) {
@@ -177,6 +243,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     }
     serverFeatures = features.build();
 
+    boolean willUseTls = serverFeatures.contains(RpcFeatureFlag.TLS);
+
     // Gather the set of server-supported mechanisms.
     Set<String> serverMechs = Sets.newHashSet();
     for (RpcHeader.NegotiatePB.SaslMechanism mech : response.getSaslMechanismsList()) {
@@ -187,24 +255,27 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     // the server also supports them. If so, try to initialize saslClient.
     // If we find a common mechanism that also can be successfully initialized,
     // choose that mech.
-    byte[] initialResponse = null;
-    String chosenMech = null;
     Map<String, String> errorsByMech = Maps.newHashMap();
     for (String clientMech : PRIORITIZED_MECHS) {
       if (!serverMechs.contains(clientMech)) {
         errorsByMech.put(clientMech, "not advertised by server");
         continue;
       }
+      Map<String, String> props = Maps.newHashMap();
+      // If we are using GSSAPI with TLS, enable integrity protection, which we use
+      // to securely transmit the channel bindings. Channel bindings aren't
+      // yet implemented, but if we don't advertise 'auth-int', then the server
+      // won't talk to us.
+      if ("GSSAPI".equals(clientMech) && willUseTls) {
+        props.put(Sasl.QOP, "auth-int");
+      }
       try {
         saslClient = Sasl.createSaslClient(new String[]{ clientMech },
                                            null,
                                            "kudu",
                                            remoteHostname,
-                                           SASL_PROPS,
+                                           props,
                                            SASL_CALLBACK);
-        if (saslClient.hasInitialResponse()) {
-          initialResponse = evaluateChallenge(new byte[0]);
-        }
         chosenMech = clientMech;
         break;
       } catch (SaslException e) {
@@ -219,12 +290,110 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
                               "]");
     }
 
+    // If we negotiated TLS, then we want to start the TLS handshake.
+    if (willUseTls) {
+      startTlsHandshake(chan);
+    } else {
+      sendSaslInitiate(chan);
+    }
+  }
+
+  /**
+   * Send the initial TLS "ClientHello" message.
+   */
+  private void startTlsHandshake(Channel chan) throws SSLException {
+    SSLEngine engine = SecurityUtil.createSslEngine();
+    // TODO(todd): remove usage of this anonymous cipher suite.
+    // It's replaced in the next patch in this patch series by
+    // a self-signed cert used for tests.
+    engine.setEnabledCipherSuites(ObjectArrays.concat(
+        engine.getEnabledCipherSuites(),
+        "TLS_DH_anon_WITH_AES_128_CBC_SHA"));
+    engine.setUseClientMode(true);
+    SslHandler handler = new SslHandler(engine);
+    handler.setEnableRenegotiation(false);
+    sslEmbedder = new DecoderEmbedder<>(handler);
+    sslHandshakeFuture = handler.handshake();
+    state = State.AWAIT_TLS_HANDSHAKE;
+    boolean sent = sendPendingOutboundTls(chan);
+    assert sent;
+  }
+
+  /**
+   * Handle an inbound message during the TLS handshake. If this message
+   * causes the handshake to complete, triggers the beginning of SASL initiation.
+   */
+  private void handleTlsMessage(Channel chan, NegotiatePB response)
+      throws SaslException {
+    Preconditions.checkState(response.getStep() == NegotiateStep.TLS_HANDSHAKE);
+    Preconditions.checkArgument(!response.getTlsHandshake().isEmpty(),
+        "empty TLS message from server");
+
+    // Pass the TLS message into our embedded SslHandler.
+    sslEmbedder.offer(ChannelBuffers.copiedBuffer(
+        response.getTlsHandshake().asReadOnlyByteBuffer()));
+    if (sendPendingOutboundTls(chan)) {
+      // Data was sent -- we must continue the handshake process.
+      return;
+    }
+    // The handshake completed.
+    // Insert the SSL handler into the pipeline so that all following traffic
+    // gets encrypted, and then move on to the SASL portion of negotiation.
+    //
+    // NOTE: this takes effect immediately (i.e. the following SASL initiation
+    // sequence is encrypted).
+    chan.getPipeline().addFirst("tls", sslEmbedder.getPipeline().getFirst());
+    sendSaslInitiate(chan);
+  }
+
+  /**
+   * If the embedded SslHandler has data to send outbound, gather
+   * it all, send it tunneled in a NegotiatePB message, and return true.
+   *
+   * Otherwise, indicates that the handshake is complete by returning false.
+   */
+  private boolean sendPendingOutboundTls(Channel chan) {
+    // The SslHandler can generate multiple TLS messages in response
+    // (e.g. ClientKeyExchange, ChangeCipherSpec, ClientFinished).
+    // We poll the handler until it stops giving us buffers.
+    List<ByteString> bufs = Lists.newArrayList();
+    while (sslEmbedder.peek() != null) {
+      bufs.add(ByteString.copyFrom(sslEmbedder.poll().toByteBuffer()));
+    }
+    ByteString data = ByteString.copyFrom(bufs);
+    if (sslHandshakeFuture.isDone()) {
+      // TODO(danburkert): is this a correct assumption? would the
+      // client ever be "done" but also produce handshake data?
+      // if it did, would we want to encrypt the SSL message or no?
+      assert data.size() == 0;
+      return false;
+    } else {
+      assert data.size() > 0;
+      sendTunneledTls(chan, data);
+      return true;
+    }
+  }
+
+  /**
+   * Send a buffer of data for the TLS handshake, encapsulated in the
+   * appropriate TLS_HANDSHAKE negotiation message.
+   */
+  private void sendTunneledTls(Channel chan, ByteString buf) {
+    sendSaslMessage(chan, NegotiatePB.newBuilder()
+        .setStep(NegotiateStep.TLS_HANDSHAKE)
+        .setTlsHandshake(buf)
+        .build());
+  }
+
+  private void sendSaslInitiate(Channel chan) throws SaslException {
     RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
-    if (initialResponse != null) {
+    if (saslClient.hasInitialResponse()) {
+      byte[] initialResponse = evaluateChallenge(new byte[0]);
       builder.setToken(ZeroCopyLiteralByteString.wrap(initialResponse));
     }
     builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_INITIATE);
     builder.addSaslMechanismsBuilder().setMechanism(chosenMech);
+    state = State.AWAIT_SASL;
     sendSaslMessage(chan, builder.build());
   }
 
@@ -241,7 +410,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
   }
 
   private void handleSuccessResponse(Channel chan) {
-    finished = true;
+    state = State.FINISHED;
     chan.getPipeline().remove(this);
 
     Channels.write(chan, makeConnectionContext());

http://git-wip-us.apache.org/repos/asf/kudu/blob/5833debe/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
index 68d9b00..163cf96 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
@@ -19,9 +19,16 @@ package org.apache.kudu.util;
 
 import java.security.AccessControlContext;
 import java.security.AccessController;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
 import javax.security.auth.Subject;
 import javax.security.auth.kerberos.KerberosPrincipal;
 import javax.security.auth.login.AppConfigurationEntry;
@@ -30,6 +37,8 @@ import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,4 +109,41 @@ public abstract class SecurityUtil {
     }
   }
 
+  /**
+   * TrustManager implementation which will trust any certificate.
+   * TODO(PKI): this needs to change so that it can be configured with
+   * the cluster's CA cert.
+   */
+  static class TrustAnyCert implements X509TrustManager {
+    @Override
+    public void checkClientTrusted(X509Certificate[] arg0, String arg1)
+        throws CertificateException {
+    }
+
+    @Override
+    public void checkServerTrusted(X509Certificate[] arg0, String arg1)
+        throws CertificateException {
+    }
+
+    @Override
+    public X509Certificate[] getAcceptedIssuers() {
+      return null;
+    }
+  }
+
+  /**
+   * Create an SSL engine configured to trust any certificate.
+   * @return
+   * @throws SSLException
+   */
+  public static SSLEngine createSslEngine() throws SSLException {
+    try {
+      SSLContext ctx = SSLContext.getInstance("TLS");
+      ctx.init(null, new TrustManager[] { new TrustAnyCert() }, null);
+      return ctx.createSSLEngine();
+    } catch (Exception e) {
+      Throwables.propagateIfInstanceOf(e, SSLException.class);
+      throw Throwables.propagate(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5833debe/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index 1fc6ed9..616937e 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -19,22 +19,33 @@ package org.apache.kudu.client;
 
 import static org.junit.Assert.*;
 
+import java.nio.ByteBuffer;
 import java.security.AccessControlContext;
 import java.security.AccessController;
+import java.util.List;
 
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLException;
 import javax.security.auth.Subject;
 
 import org.apache.kudu.client.Negotiator.Result;
 import org.apache.kudu.rpc.RpcHeader.ConnectionContextPB;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
+import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB.SaslMechanism;
+import org.apache.kudu.util.SecurityUtil;
 import org.apache.kudu.rpc.RpcHeader.ResponseHeader;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
+import org.jboss.netty.handler.ssl.SslHandler;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
 
 public class TestNegotiator {
@@ -66,7 +77,7 @@ public class TestNegotiator {
     RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
     NegotiatePB body = (NegotiatePB) msg.getBody();
     assertEquals(Negotiator.SASL_CALL_ID, msg.getHeader().getCallId());
-    assertEquals(NegotiateStep.NEGOTIATE, ((NegotiatePB)msg.getBody()).getStep());
+    assertEquals(NegotiateStep.NEGOTIATE, body.getStep());
 
     // Respond with NEGOTIATE.
     embedder.offer(fakeResponse(
@@ -104,4 +115,104 @@ public class TestNegotiator {
     assertNotNull(result);
   }
 
+  private static void runTasks(SSLEngineResult result,
+      SSLEngine engine) {
+    if (result.getHandshakeStatus() != HandshakeStatus.NEED_TASK) {
+      return;
+    }
+    Runnable task;
+    while ((task = engine.getDelegatedTask()) != null) {
+      task.run();
+    }
+  }
+
+  private static CallResponse runServerStep(SSLEngine engine,
+      ByteString clientTlsMessage) throws SSLException {
+    Negotiator.LOG.info("Handling TLS message from client: {}", clientTlsMessage.toByteArray());
+    ByteBuffer dst = ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
+    ByteBuffer src = clientTlsMessage.asReadOnlyByteBuffer();
+    do {
+      SSLEngineResult result = engine.unwrap(src, dst);
+      runTasks(result, engine);
+    } while (engine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+
+    if (engine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_WRAP) {
+      // The server has more to send.
+      // Produce the ServerHello and send it back to the client.
+      List<ByteString> bufs = Lists.newArrayList();
+      while (engine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_WRAP) {
+        dst.clear();
+        runTasks(engine.wrap(ByteBuffer.allocate(0), dst), engine);
+        dst.flip();
+        bufs.add(ByteString.copyFrom(dst));
+      }
+      return fakeResponse(
+          ResponseHeader.newBuilder().setCallId(-33).build(),
+          NegotiatePB.newBuilder()
+            .setTlsHandshake(ByteString.copyFrom(bufs))
+            .setStep(NegotiateStep.TLS_HANDSHAKE)
+            .build());
+    } else if (engine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+      // Handshake complete.
+      return null;
+    } else {
+      throw new AssertionError("unexpected state: " + engine.getHandshakeStatus());
+    }
+  }
+
+  @Test
+  public void testTlsNegotiation() throws Exception {
+    SSLEngine serverEngine = SecurityUtil.createSslEngine();
+    serverEngine.setUseClientMode(false);
+    // Enable only an anonymous cipher suite, so we don't need to deal with
+    // setting up certs in this test, for now.
+    serverEngine.setEnabledCipherSuites(new String[]{
+        "TLS_DH_anon_WITH_AES_128_CBC_SHA"
+    });
+
+    negotiator.sendHello(embedder.getPipeline().getChannel());
+
+    // Expect client->server: NEGOTIATE, TLS included.
+    RpcOutboundMessage msg = (RpcOutboundMessage) embedder.poll();
+    NegotiatePB body = (NegotiatePB) msg.getBody();
+    assertEquals(NegotiateStep.NEGOTIATE, body.getStep());
+    assertTrue(body.getSupportedFeaturesList().contains(RpcFeatureFlag.TLS));
+
+    // Fake a server response with TLS enabled.
+    embedder.offer(fakeResponse(
+        ResponseHeader.newBuilder().setCallId(-33).build(),
+        NegotiatePB.newBuilder()
+          .addSaslMechanisms(NegotiatePB.SaslMechanism.newBuilder().setMechanism("PLAIN"))
+          .addSupportedFeatures(RpcFeatureFlag.TLS)
+          .setStep(NegotiateStep.NEGOTIATE)
+          .build()));
+
+    // Expect client->server: TLS_HANDSHAKE.
+    msg = (RpcOutboundMessage) embedder.poll();
+    body = (NegotiatePB) msg.getBody();
+    assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
+
+    // Consume the ClientHello in our fake server, which should generate ServerHello.
+    embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
+
+    // Expect client to generate ClientKeyExchange, ChangeCipherSpec, Finished.
+    msg = (RpcOutboundMessage) embedder.poll();
+    body = (NegotiatePB) msg.getBody();
+    assertEquals(NegotiateStep.TLS_HANDSHAKE, body.getStep());
+
+    // Server consumes the above. Should send the TLS "Finished" message.
+    embedder.offer(runServerStep(serverEngine, body.getTlsHandshake()));
+
+    // The pipeline should now have an SSL handler as the first handler.
+    assertTrue(embedder.getPipeline().getFirst() instanceof SslHandler);
+
+    // The Negotiator should have sent the SASL_INITIATE at this point.
+    // NOTE: in a non-mock environment, this message would now be encrypted
+    // by the newly-added TLS handler. But, with the DecoderEmbedder that we're
+    // using, we don't actually end up processing outbound events. Upgrading
+    // to Netty 4 and using EmbeddedChannel instead would make this more realistic.
+    msg = (RpcOutboundMessage) embedder.poll();
+    body = (NegotiatePB) msg.getBody();
+    assertEquals(NegotiateStep.SASL_INITIATE, body.getStep());
+  }
 }


[2/2] kudu git commit: java: implement Channel Bindings

Posted by da...@apache.org.
java: implement Channel Bindings

This adds a utility function for calculating RFC 5929
"tls-server-endpoint" channel bindings in Java. It also hooks the
channel bindings verification in the Negotiator implementation.

A new simple unit test verifies that channel bindings can be calculated
from a cert.

I also added a Java-format KeyStore with a self-signed cert as a test
resource. This was useful for writing a unit test. I generated it based
on the certs found in the C++ source using the following:

openssl pkcs12 -export -in /tmp/cert.pem -inkey /tmp/key.pem > /tmp/p12
keytool -importkeystore -srckeystore /tmp/p12 -destkeystore java/kudu-client/src/test/resources/test-key-and-cert.jks

No new unit test actually verifies the channel bindings verification,
but I did check that the integrity check is working by temporarily
flipping a byte before unwrapping and making sure that verification
failed.

Change-Id: I8b604ea6a0cff55820f7fbbb3ba4beba3a888a48
Reviewed-on: http://gerrit.cloudera.org:8080/5953
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/41dd9d41
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/41dd9d41
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/41dd9d41

Branch: refs/heads/master
Commit: 41dd9d41a1e6a8a0b60104e26ed1f0ece04158e6
Parents: 5833deb
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Feb 8 17:35:48 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Sat Feb 11 00:51:58 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/kudu/client/Negotiator.java |  64 +++++++++++++++----
 .../java/org/apache/kudu/util/SecurityUtil.java |  57 +++++++++++++++++
 .../org/apache/kudu/client/TestNegotiator.java  |  44 ++++++++++---
 .../src/test/resources/test-key-and-cert.jks    | Bin 0 -> 2212 bytes
 4 files changed, 143 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/41dd9d41/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 3b0385b..eba71a3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -27,12 +27,14 @@
 package org.apache.kudu.client;
 
 import java.security.PrivilegedExceptionAction;
+import java.security.cert.Certificate;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -46,11 +48,9 @@ import javax.security.sasl.SaslException;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.ObjectArrays;
 import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ZeroCopyLiteralByteString;
@@ -140,6 +140,8 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
    */
   private ChannelFuture sslHandshakeFuture;
 
+  private Certificate peerCert;
+
   public Negotiator(Subject subject, String remoteHostname) {
     this.subject = subject;
     this.remoteHostname = remoteHostname;
@@ -211,7 +213,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
         handleChallengeResponse(chan, response);
         break;
       case SASL_SUCCESS:
-        handleSuccessResponse(chan);
+        handleSuccessResponse(chan, response);
         break;
       default:
         throw new IllegalStateException("Wrong negotiation step: " +
@@ -263,9 +265,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
       }
       Map<String, String> props = Maps.newHashMap();
       // If we are using GSSAPI with TLS, enable integrity protection, which we use
-      // to securely transmit the channel bindings. Channel bindings aren't
-      // yet implemented, but if we don't advertise 'auth-int', then the server
-      // won't talk to us.
+      // to securely transmit the channel bindings.
       if ("GSSAPI".equals(clientMech) && willUseTls) {
         props.put(Sasl.QOP, "auth-int");
       }
@@ -303,12 +303,6 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
    */
   private void startTlsHandshake(Channel chan) throws SSLException {
     SSLEngine engine = SecurityUtil.createSslEngine();
-    // TODO(todd): remove usage of this anonymous cipher suite.
-    // It's replaced in the next patch in this patch series by
-    // a self-signed cert used for tests.
-    engine.setEnabledCipherSuites(ObjectArrays.concat(
-        engine.getEnabledCipherSuites(),
-        "TLS_DH_anon_WITH_AES_128_CBC_SHA"));
     engine.setUseClientMode(true);
     SslHandler handler = new SslHandler(engine);
     handler.setEnableRenegotiation(false);
@@ -342,7 +336,17 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     //
     // NOTE: this takes effect immediately (i.e. the following SASL initiation
     // sequence is encrypted).
-    chan.getPipeline().addFirst("tls", sslEmbedder.getPipeline().getFirst());
+    SslHandler handler = (SslHandler)sslEmbedder.getPipeline().getFirst();
+    try {
+      Certificate[] certs = handler.getEngine().getSession().getPeerCertificates();
+      if (certs.length == 0) {
+        throw new SSLPeerUnverifiedException("no peer cert found");
+      }
+    } catch (SSLPeerUnverifiedException e) {
+      throw Throwables.propagate(e);
+    }
+
+    chan.getPipeline().addFirst("tls", handler);
     sendSaslInitiate(chan);
   }
 
@@ -362,6 +366,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     }
     ByteString data = ByteString.copyFrom(bufs);
     if (sslHandshakeFuture.isDone()) {
+      // TODO(todd): should check sslHandshakeFuture.isSuccess()
       // TODO(danburkert): is this a correct assumption? would the
       // client ever be "done" but also produce handshake data?
       // if it did, would we want to encrypt the SSL message or no?
@@ -409,7 +414,38 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
     sendSaslMessage(chan, builder.build());
   }
 
-  private void handleSuccessResponse(Channel chan) {
+  /**
+   * Verify the channel bindings included in 'response'. This is used only
+   * for GSSAPI-authenticated connections over TLS.
+   * @throws RuntimeException on failure to verify
+   */
+  private void verifyChannelBindings(NegotiatePB response) {
+    try {
+      byte[] expected = SecurityUtil.getEndpointChannelBindings(peerCert);
+      if (!response.hasChannelBindings()) {
+        throw new RuntimeException("no channel bindings provided by remote peer");
+      }
+      byte[] provided = response.getChannelBindings().toByteArray();
+      // NOTE: the C SASL library's implementation of sasl_encode() actually
+      // includes a length prefix. Java's equivalents do not. So, we have to
+      // chop off the length prefix here before unwrapping.
+      if (provided.length < 4) {
+        throw new RuntimeException("invalid too-short channel bindings");
+      }
+      byte[] unwrapped = saslClient.unwrap(provided, 4, provided.length - 4);
+      if (!Bytes.equals(expected, unwrapped)) {
+        throw new RuntimeException("invalid channel bindings provided by remote peer");
+      }
+    } catch (SaslException se) {
+      throw Throwables.propagate(se);
+    }
+  }
+
+  private void handleSuccessResponse(Channel chan, NegotiatePB response) {
+    if (peerCert != null && "GSSAPI".equals(chosenMech)) {
+      verifyChannelBindings(response);
+    }
+
     state = State.FINISHED;
     chan.getPipeline().remove(this);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/41dd9d41/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
index 163cf96..37e4302 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/SecurityUtil.java
@@ -19,6 +19,8 @@ package org.apache.kudu.util;
 
 import java.security.AccessControlContext;
 import java.security.AccessController;
+import java.security.MessageDigest;
+import java.security.cert.Certificate;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.util.HashMap;
@@ -37,7 +39,9 @@ import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +54,26 @@ public abstract class SecurityUtil {
   public static final String KUDU_TICKETCACHE_PROPERTY = "kudu.krb5ccname";
 
   /**
+   * Map from the names of digest algorithms used in X509 certificates to
+   * the appropriate MessageDigest implementation to use for channel-bindings.
+   */
+  private static final Map<String, String> CERT_DIGEST_TO_MESSAGE_DIGEST =
+      ImmutableMap.<String, String>builder()
+      // RFC 5929: if the certificate's signatureAlgorithm uses a single hash
+      // function, and that hash function is either MD5 [RFC1321] or SHA-1
+      // [RFC3174], then use SHA-256 [FIPS-180-3];
+      .put("MD5", "SHA-256")
+      .put("SHA1", "SHA-256")
+      // For other algorithms, use the provided hash function.
+      .put("SHA224", "SHA-224")
+      .put("SHA256", "SHA-256")
+      .put("SHA384", "SHA-384")
+      .put("SHA512", "SHA-512")
+      // The above list is exhaustive as of JDK8's implementation of
+      // SignatureAndHashAlgorithm.
+      .build();
+
+  /**
    * Return the Subject associated with the current thread's AccessController,
    * if that subject has Kerberos credentials. If there is no such subject, or
    * the subject has no Kerberos credentials, logins in a new subject from the
@@ -110,6 +134,39 @@ public abstract class SecurityUtil {
   }
 
   /**
+   * Compute the "tls-server-endpoint" channel binding data for the given X509
+   * certificate. The algorithm is specified in RFC 5929.
+   *
+   * @return the expected channel bindings for the certificate
+   * @throws RuntimeException if the certificate is not an X509 cert, or if
+   * it uses a signature type for which we cannot compute channel bindings
+   */
+  public static byte[] getEndpointChannelBindings(Certificate cert) {
+    Preconditions.checkArgument(cert instanceof X509Certificate,
+        "can only handle X509 certs");
+    X509Certificate x509 = (X509Certificate)cert;
+    String sigAlg = x509.getSigAlgName();
+
+    // The signature algorithm name is a string like 'SHA256withRSA'.
+    // There's no API available to actually find just the digest algorithm,
+    // so we resort to some hackery.
+    String[] components = sigAlg.split("with");
+    String digestAlg = CERT_DIGEST_TO_MESSAGE_DIGEST.get(components[0].toUpperCase());
+    if (digestAlg == null) {
+      // RFC 5929: if the certificate's signatureAlgorithm uses no hash functions or
+      // uses multiple hash functions, then this channel binding type's channel
+      // bindings are undefined at this time (updates to is channel binding type may
+      // occur to address this issue if it ever arises).
+      throw new RuntimeException("cert uses unknown signature algorithm: " + sigAlg);
+    }
+    try {
+      return MessageDigest.getInstance(digestAlg).digest(cert.getEncoded());
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  /**
    * TrustManager implementation which will trust any certificate.
    * TODO(PKI): this needs to change so that it can be configured with
    * the cluster's CA cert.

http://git-wip-us.apache.org/repos/asf/kudu/blob/41dd9d41/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
index 616937e..880ab5b 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestNegotiator.java
@@ -22,8 +22,12 @@ import static org.junit.Assert.*;
 import java.nio.ByteBuffer;
 import java.security.AccessControlContext;
 import java.security.AccessController;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
 import java.util.List;
 
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
@@ -34,9 +38,9 @@ import org.apache.kudu.client.Negotiator.Result;
 import org.apache.kudu.rpc.RpcHeader.ConnectionContextPB;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB;
 import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+import org.apache.kudu.util.SecurityUtil;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB.NegotiateStep;
 import org.apache.kudu.rpc.RpcHeader.NegotiatePB.SaslMechanism;
-import org.apache.kudu.util.SecurityUtil;
 import org.apache.kudu.rpc.RpcHeader.ResponseHeader;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
@@ -44,6 +48,7 @@ import org.jboss.netty.handler.ssl.SslHandler;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
@@ -52,6 +57,8 @@ public class TestNegotiator {
   private Negotiator negotiator;
   private DecoderEmbedder<Object> embedder;
 
+  private static final char[] password = "password".toCharArray();
+
   @Before
   public void setup() {
     AccessControlContext context = AccessController.getContext();
@@ -66,6 +73,33 @@ public class TestNegotiator {
     return new CallResponse(buf);
   }
 
+  KeyStore loadTestKeystore() throws Exception {
+    KeyStore ks = KeyStore.getInstance("JKS");
+    ks.load(TestNegotiator.class.getResourceAsStream("/test-key-and-cert.jks"),
+        password);
+    return ks;
+  }
+
+  SSLEngine createServerEngine() {
+    try {
+      KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+      kmf.init(loadTestKeystore(), password);
+      SSLContext ctx = SSLContext.getInstance("TLS");
+      ctx.init(kmf.getKeyManagers(), null, null);
+      return ctx.createSSLEngine();
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Test
+  public void testChannelBinding() throws Exception {
+    KeyStore ks = loadTestKeystore();
+    Certificate cert = ks.getCertificate("1");
+    byte[] bindings = SecurityUtil.getEndpointChannelBindings(cert);
+    assertEquals(32, bindings.length);
+  }
+
   /**
    * Simple test case for a PLAIN negotiation.
    */
@@ -162,14 +196,8 @@ public class TestNegotiator {
 
   @Test
   public void testTlsNegotiation() throws Exception {
-    SSLEngine serverEngine = SecurityUtil.createSslEngine();
+    SSLEngine serverEngine = createServerEngine();
     serverEngine.setUseClientMode(false);
-    // Enable only an anonymous cipher suite, so we don't need to deal with
-    // setting up certs in this test, for now.
-    serverEngine.setEnabledCipherSuites(new String[]{
-        "TLS_DH_anon_WITH_AES_128_CBC_SHA"
-    });
-
     negotiator.sendHello(embedder.getPipeline().getChannel());
 
     // Expect client->server: NEGOTIATE, TLS included.

http://git-wip-us.apache.org/repos/asf/kudu/blob/41dd9d41/java/kudu-client/src/test/resources/test-key-and-cert.jks
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/resources/test-key-and-cert.jks b/java/kudu-client/src/test/resources/test-key-and-cert.jks
new file mode 100644
index 0000000..45d885d
Binary files /dev/null and b/java/kudu-client/src/test/resources/test-key-and-cert.jks differ