You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/05/20 15:24:25 UTC
[kafka] branch trunk updated: KAFKA-8381;
Disable hostname validation when verifying inter-broker SSL (#6757)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 614ea55 KAFKA-8381; Disable hostname validation when verifying inter-broker SSL (#6757)
614ea55 is described below
commit 614ea55ad7495c3abcc5942caaf5274ad4029327
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon May 20 16:24:11 2019 +0100
KAFKA-8381; Disable hostname validation when verifying inter-broker SSL (#6757)
- Make endpoint validation configurable on SslEngineBuilder when creating an engine
- Disable endpoint validation for engines created for inter-broker SSL validation since it is unsafe to use `localhost`
- Use empty hostname in validation engine to ensure tests fail if validation is re-enabled by mistake
- Add tests to verify inter-broker SSL validation
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../common/security/ssl/SslEngineBuilder.java | 10 +++---
.../kafka/common/security/ssl/SslFactory.java | 18 +++++++---
.../common/network/SslTransportLayerTest.java | 40 ++++++++++++++++++++++
3 files changed, 57 insertions(+), 11 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java
index 9aaf511..fe9e135 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java
@@ -57,7 +57,6 @@ public class SslEngineBuilder {
private final SecurityStore truststore;
private final String[] cipherSuites;
private final String[] enabledProtocols;
- private final String endpointIdentification;
private final SecureRandom secureRandomImplementation;
private final SSLContext sslContext;
private final SslClientAuth sslClientAuth;
@@ -82,8 +81,6 @@ public class SslEngineBuilder {
this.enabledProtocols = null;
}
- this.endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
-
this.secureRandomImplementation = createSecureRandom((String)
configs.get(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG));
@@ -201,11 +198,12 @@ public class SslEngineBuilder {
* Create a new SSLEngine object.
*
* @param mode Whether to use client or server mode.
- * @param peerHost The peer host to use.
- * @param peerPort The peer port to use.
+ * @param peerHost The peer host to use. This is used in client mode if endpoint validation is enabled.
+ * @param peerPort The peer port to use. This is a hint and not used for validation.
+ * @param endpointIdentification Endpoint identification algorithm for client mode.
* @return The new SSLEngine.
*/
- public SSLEngine createSslEngine(Mode mode, String peerHost, int peerPort) {
+ public SSLEngine createSslEngine(Mode mode, String peerHost, int peerPort, String endpointIdentification) {
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index f8c8f3a..882b63d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -52,6 +52,7 @@ public class SslFactory implements Reconfigurable {
private final Mode mode;
private final String clientAuthConfigOverride;
private final boolean keystoreVerifiableUsingTruststore;
+ private String endpointIdentification;
private SslEngineBuilder sslEngineBuilder;
public SslFactory(Mode mode) {
@@ -80,6 +81,8 @@ public class SslFactory implements Reconfigurable {
if (sslEngineBuilder != null) {
throw new IllegalStateException("SslFactory was already configured.");
}
+ this.endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+
Map<String, Object> nextConfigs = new HashMap<>();
copyMapEntries(nextConfigs, configs, SslConfigs.NON_RECONFIGURABLE_CONFIGS);
copyMapEntries(nextConfigs, configs, SslConfigs.RECONFIGURABLE_CONFIGS);
@@ -162,7 +165,7 @@ public class SslFactory implements Reconfigurable {
if (sslEngineBuilder == null) {
throw new IllegalStateException("SslFactory has not been configured.");
}
- return sslEngineBuilder.createSslEngine(mode, peerHost, peerPort);
+ return sslEngineBuilder.createSslEngine(mode, peerHost, peerPort, endpointIdentification);
}
public SslEngineBuilder sslEngineBuilder() {
@@ -246,10 +249,15 @@ public class SslFactory implements Reconfigurable {
static void validate(SslEngineBuilder oldEngineBuilder,
SslEngineBuilder newEngineBuilder) throws SSLException {
- validate(oldEngineBuilder.createSslEngine(Mode.SERVER, "localhost", 0),
- newEngineBuilder.createSslEngine(Mode.CLIENT, "localhost", 0));
- validate(newEngineBuilder.createSslEngine(Mode.SERVER, "localhost", 0),
- oldEngineBuilder.createSslEngine(Mode.CLIENT, "localhost", 0));
+ validate(createSslEngineForValidation(oldEngineBuilder, Mode.SERVER),
+ createSslEngineForValidation(newEngineBuilder, Mode.CLIENT));
+ validate(createSslEngineForValidation(newEngineBuilder, Mode.SERVER),
+ createSslEngineForValidation(oldEngineBuilder, Mode.CLIENT));
+ }
+
+ private static SSLEngine createSslEngineForValidation(SslEngineBuilder sslEngineBuilder, Mode mode) {
+ // Use empty hostname, disable hostname verification
+ return sslEngineBuilder.createSslEngine(mode, "", 0, "");
}
static void validate(SSLEngine clientEngine, SSLEngine serverEngine) throws SSLException {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 118baec..d25fa61 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -889,6 +889,46 @@ public class SslTransportLayerTest {
}
/**
+ * Verifies that inter-broker listener with validation of truststore against keystore works
+ * with configs including mutual authentication and hostname verification.
+ */
+ @Test
+ public void testInterBrokerSslConfigValidation() throws Exception {
+ SecurityProtocol securityProtocol = SecurityProtocol.SSL;
+ sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+ sslServerConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
+ sslServerConfigs.putAll(serverCertStores.keyStoreProps());
+ sslServerConfigs.putAll(serverCertStores.trustStoreProps());
+ sslClientConfigs.putAll(serverCertStores.keyStoreProps());
+ sslClientConfigs.putAll(serverCertStores.trustStoreProps());
+ TestSecurityConfig config = new TestSecurityConfig(sslServerConfigs);
+ ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
+ ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
+ true, securityProtocol, config, null, null, time);
+ server = new NioEchoServer(listenerName, securityProtocol, config,
+ "localhost", serverChannelBuilder, null, time);
+ server.start();
+
+ this.selector = createSelector(sslClientConfigs, null, null, null);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+ selector.connect("0", addr, BUFFER_SIZE, BUFFER_SIZE);
+ NetworkTestUtils.checkClientConnection(selector, "0", 100, 10);
+ }
+
+ /**
+ * Verifies that inter-broker listener with validation of truststore against keystore
+ * fails if certs from keystore are not trusted.
+ */
+ @Test(expected = KafkaException.class)
+ public void testInterBrokerSslConfigValidationFailure() throws Exception {
+ SecurityProtocol securityProtocol = SecurityProtocol.SSL;
+ sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+ TestSecurityConfig config = new TestSecurityConfig(sslServerConfigs);
+ ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
+ ChannelBuilders.serverChannelBuilder(listenerName, true, securityProtocol, config, null, null, time);
+ }
+
+ /**
* Tests reconfiguration of server keystore. Verifies that existing connections continue
* to work with old keystore and new connections work with new keystore.
*/