You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/05/20 15:24:25 UTC

[kafka] branch trunk updated: KAFKA-8381; Disable hostname validation when verifying inter-broker SSL (#6757)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 614ea55  KAFKA-8381; Disable hostname validation when verifying inter-broker SSL (#6757)
614ea55 is described below

commit 614ea55ad7495c3abcc5942caaf5274ad4029327
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon May 20 16:24:11 2019 +0100

    KAFKA-8381; Disable hostname validation when verifying inter-broker SSL (#6757)
    
    - Make endpoint validation configurable on SslEngineBuilder when creating an engine
    - Disable endpoint validation for engines created for inter-broker SSL validation since it is unsafe to use `localhost`
    - Use empty hostname in validation engine to ensure tests fail if validation is re-enabled by mistake
    - Add tests to verify inter-broker SSL validation
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../common/security/ssl/SslEngineBuilder.java      | 10 +++---
 .../kafka/common/security/ssl/SslFactory.java      | 18 +++++++---
 .../common/network/SslTransportLayerTest.java      | 40 ++++++++++++++++++++++
 3 files changed, 57 insertions(+), 11 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java
index 9aaf511..fe9e135 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java
@@ -57,7 +57,6 @@ public class SslEngineBuilder {
     private final SecurityStore truststore;
     private final String[] cipherSuites;
     private final String[] enabledProtocols;
-    private final String endpointIdentification;
     private final SecureRandom secureRandomImplementation;
     private final SSLContext sslContext;
     private final SslClientAuth sslClientAuth;
@@ -82,8 +81,6 @@ public class SslEngineBuilder {
             this.enabledProtocols = null;
         }
 
-        this.endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
-
         this.secureRandomImplementation = createSecureRandom((String)
                 configs.get(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG));
 
@@ -201,11 +198,12 @@ public class SslEngineBuilder {
      * Create a new SSLEngine object.
      *
      * @param mode      Whether to use client or server mode.
-     * @param peerHost  The peer host to use.
-     * @param peerPort  The peer port to use.
+     * @param peerHost  The peer host to use. This is used in client mode if endpoint validation is enabled.
+     * @param peerPort  The peer port to use. This is a hint and not used for validation.
+     * @param endpointIdentification Endpoint identification algorithm for client mode.
      * @return          The new SSLEngine.
      */
-    public SSLEngine createSslEngine(Mode mode, String peerHost, int peerPort) {
+    public SSLEngine createSslEngine(Mode mode, String peerHost, int peerPort, String endpointIdentification) {
         SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
         if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
         if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index f8c8f3a..882b63d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -52,6 +52,7 @@ public class SslFactory implements Reconfigurable {
     private final Mode mode;
     private final String clientAuthConfigOverride;
     private final boolean keystoreVerifiableUsingTruststore;
+    private String endpointIdentification;
     private SslEngineBuilder sslEngineBuilder;
 
     public SslFactory(Mode mode) {
@@ -80,6 +81,8 @@ public class SslFactory implements Reconfigurable {
         if (sslEngineBuilder != null) {
             throw new IllegalStateException("SslFactory was already configured.");
         }
+        this.endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+
         Map<String, Object> nextConfigs = new HashMap<>();
         copyMapEntries(nextConfigs, configs, SslConfigs.NON_RECONFIGURABLE_CONFIGS);
         copyMapEntries(nextConfigs, configs, SslConfigs.RECONFIGURABLE_CONFIGS);
@@ -162,7 +165,7 @@ public class SslFactory implements Reconfigurable {
         if (sslEngineBuilder == null) {
             throw new IllegalStateException("SslFactory has not been configured.");
         }
-        return sslEngineBuilder.createSslEngine(mode, peerHost, peerPort);
+        return sslEngineBuilder.createSslEngine(mode, peerHost, peerPort, endpointIdentification);
     }
 
     public SslEngineBuilder sslEngineBuilder() {
@@ -246,10 +249,15 @@ public class SslFactory implements Reconfigurable {
 
         static void validate(SslEngineBuilder oldEngineBuilder,
                              SslEngineBuilder newEngineBuilder) throws SSLException {
-            validate(oldEngineBuilder.createSslEngine(Mode.SERVER, "localhost", 0),
-                    newEngineBuilder.createSslEngine(Mode.CLIENT, "localhost", 0));
-            validate(newEngineBuilder.createSslEngine(Mode.SERVER, "localhost", 0),
-                    oldEngineBuilder.createSslEngine(Mode.CLIENT, "localhost", 0));
+            validate(createSslEngineForValidation(oldEngineBuilder, Mode.SERVER),
+                    createSslEngineForValidation(newEngineBuilder, Mode.CLIENT));
+            validate(createSslEngineForValidation(newEngineBuilder, Mode.SERVER),
+                    createSslEngineForValidation(oldEngineBuilder, Mode.CLIENT));
+        }
+
+        private static SSLEngine createSslEngineForValidation(SslEngineBuilder sslEngineBuilder, Mode mode) {
+            // Use empty hostname, disable hostname verification
+            return sslEngineBuilder.createSslEngine(mode, "", 0, "");
         }
 
         static void validate(SSLEngine clientEngine, SSLEngine serverEngine) throws SSLException {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 118baec..d25fa61 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -889,6 +889,46 @@ public class SslTransportLayerTest {
     }
 
     /**
+     * Verifies that inter-broker listener with validation of truststore against keystore works
+     * with configs including mutual authentication and hostname verification.
+     */
+    @Test
+    public void testInterBrokerSslConfigValidation() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
+        sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+        sslServerConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
+        sslServerConfigs.putAll(serverCertStores.keyStoreProps());
+        sslServerConfigs.putAll(serverCertStores.trustStoreProps());
+        sslClientConfigs.putAll(serverCertStores.keyStoreProps());
+        sslClientConfigs.putAll(serverCertStores.trustStoreProps());
+        TestSecurityConfig config = new TestSecurityConfig(sslServerConfigs);
+        ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
+        ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
+                true, securityProtocol, config, null, null, time);
+        server = new NioEchoServer(listenerName, securityProtocol, config,
+                "localhost", serverChannelBuilder, null, time);
+        server.start();
+
+        this.selector = createSelector(sslClientConfigs, null, null, null);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+        selector.connect("0", addr, BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.checkClientConnection(selector, "0", 100, 10);
+    }
+
+    /**
+     * Verifies that inter-broker listener with validation of truststore against keystore
+     * fails if certs from keystore are not trusted.
+     */
+    @Test(expected = KafkaException.class)
+    public void testInterBrokerSslConfigValidationFailure() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
+        sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+        TestSecurityConfig config = new TestSecurityConfig(sslServerConfigs);
+        ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
+        ChannelBuilders.serverChannelBuilder(listenerName, true, securityProtocol, config, null, null, time);
+    }
+
+    /**
      * Tests reconfiguration of server keystore. Verifies that existing connections continue
      * to work with old keystore and new connections work with new keystore.
      */