You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2019/05/02 22:11:54 UTC

[kafka] branch trunk updated: KAFKA-8191: Add pluggability of KeyManager to generate the broker Private Keys and Certificates

This is an automated email from the ASF dual-hosted git repository.

sriharsha pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b074173  KAFKA-8191: Add pluggability of KeyManager to generate the broker Private Keys and Certificates
b074173 is described below

commit b074173ea249ef028272c2c07358222550917d8c
Author: saisandeep <ms...@uber.com>
AuthorDate: Thu May 2 15:11:42 2019 -0700

    KAFKA-8191: Add pluggability of KeyManager to generate the broker Private Keys and Certificates
    
    Reviewers: Sriharsha Chintalapani <sr...@apache.org>, Ismael Juma <is...@juma.me.uk>
---
 .../kafka/common/security/ssl/SslFactory.java      |  12 ++-
 .../apache/kafka/common/network/SelectorTest.java  |   2 +-
 .../kafka/common/network/SslSelectorTest.java      |  49 +++++++++
 .../kafka/common/security/ssl/SslFactoryTest.java  |  23 ++++-
 .../security/ssl/mock/TestKeyManagerFactory.java   | 113 +++++++++++++++++++++
 .../common/security/ssl/mock/TestProvider.java     |  36 +++++++
 .../security/ssl/mock/TestTrustManagerFactory.java |  88 ++++++++++++++++
 .../java/org/apache/kafka/test/TestSslUtils.java   |  18 +++-
 8 files changed, 332 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index a03d1bc..73d9210 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -234,12 +234,16 @@ public class SslFactory implements Reconfigurable {
             sslContext = SSLContext.getInstance(protocol);
 
         KeyManager[] keyManagers = null;
-        if (keystore != null) {
+        if (keystore != null || kmfAlgorithm != null) {
             String kmfAlgorithm = this.kmfAlgorithm != null ? this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm();
             KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm);
-            KeyStore ks = keystore.load();
-            Password keyPassword = keystore.keyPassword != null ? keystore.keyPassword : keystore.password;
-            kmf.init(ks, keyPassword.value().toCharArray());
+            if (keystore != null) {
+                KeyStore ks = keystore.load();
+                Password keyPassword = keystore.keyPassword != null ? keystore.keyPassword : keystore.password;
+                kmf.init(ks, keyPassword.value().toCharArray());
+            } else {
+                kmf.init(null,  null);
+            }
             keyManagers = kmf.getKeyManagers();
         }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 0f2d295..ae06836 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -813,7 +813,7 @@ public class SelectorTest {
         verifySelectorEmpty(this.selector);
     }
 
-    private void verifySelectorEmpty(Selector selector) throws Exception {
+    public void verifySelectorEmpty(Selector selector) throws Exception {
         for (KafkaChannel channel : selector.channels()) {
             selector.close(channel.id());
             assertNull(channel.selectionKey().attachment());
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 1f9739b..02cbaf8 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -23,8 +23,12 @@ import org.apache.kafka.common.memory.SimpleMemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.ssl.SslFactory;
+import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory;
+import org.apache.kafka.common.security.ssl.mock.TestProvider;
+import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestSslUtils;
 import org.apache.kafka.test.TestUtils;
@@ -37,6 +41,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.security.Provider;
+import java.security.Security;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -86,6 +92,49 @@ public class SslSelectorTest extends SelectorTest {
     }
 
     @Test
+    public void testConnectionWithCustomKeyManager() throws Exception {
+        Provider provider = new TestProvider();
+        Security.addProvider(provider);
+
+        int requestSize = 100 * 1024;
+        final String node = "0";
+        String request = TestUtils.randomString(requestSize);
+
+        Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(
+                TestKeyManagerFactory.ALGORITHM,
+                TestTrustManagerFactory.ALGORITHM
+        );
+        EchoServer server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
+        server.start();
+        Time time = new MockTime();
+        File trustStoreFile = new File(TestKeyManagerFactory.TestKeyManager.mockTrustStoreFile);
+        Map<String, Object> sslClientConfigs = TestSslUtils.createSslConfig(true, true, Mode.CLIENT, trustStoreFile, "client");
+
+        ChannelBuilder channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
+        channelBuilder.configure(sslClientConfigs);
+        Metrics metrics = new Metrics();
+        Selector selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext());
+
+        selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        while (!selector.connected().contains(node))
+            selector.poll(10000L);
+        while (!selector.isChannelReady(node))
+            selector.poll(10000L);
+
+        selector.send(createSend(node, request));
+
+        waitForBytesBuffered(selector, node);
+
+        selector.close(node);
+        super.verifySelectorEmpty(selector);
+
+        Security.removeProvider(provider.getName());
+        selector.close();
+        server.close();
+        metrics.close();
+    }
+
+    @Test
     public void testDisconnectWithIntermediateBufferedBytes() throws Exception {
         int requestSize = 100 * 1024;
         final String node = "0";
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index 11035d0..a6ce0b4 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.security.ssl;
 import java.io.File;
 import java.nio.file.Files;
 import java.security.KeyStore;
+import java.security.Provider;
 import java.util.Map;
 
 import javax.net.ssl.SSLContext;
@@ -28,6 +29,9 @@ import javax.net.ssl.SSLHandshakeException;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory;
+import org.apache.kafka.common.security.ssl.mock.TestProvider;
+import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory;
 import org.apache.kafka.test.TestSslUtils;
 import org.apache.kafka.common.network.Mode;
 import org.junit.Test;
@@ -41,10 +45,8 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import java.security.Security;
 
-/**
- * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
- */
 public class SslFactoryTest {
 
     @Test
@@ -62,6 +64,21 @@ public class SslFactoryTest {
     }
 
     @Test
+    public void testSslFactoryWithCustomKeyManagerConfiguration() throws Exception {
+        Provider provider = new TestProvider();
+        Security.addProvider(provider);
+        Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(
+                TestKeyManagerFactory.ALGORITHM,
+                TestTrustManagerFactory.ALGORITHM
+        );
+        SslFactory sslFactory = new SslFactory(Mode.SERVER);
+        sslFactory.configure(serverSslConfig);
+        SSLContext sslContext = sslFactory.createSSLContext(null, null);
+        assertNotNull("SSL context not created", sslContext);
+        Security.removeProvider(provider.getName());
+    }
+
+    @Test
     public void testSslFactoryWithoutPasswordConfiguration() throws Exception {
         File trustStoreFile = File.createTempFile("truststore", ".jks");
         Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestKeyManagerFactory.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestKeyManagerFactory.java
new file mode 100644
index 0000000..dc686c2
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestKeyManagerFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl.mock;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactorySpi;
+import javax.net.ssl.ManagerFactoryParameters;
+import javax.net.ssl.X509ExtendedKeyManager;
+import java.io.File;
+import java.io.IOException;
+import java.net.Socket;
+import java.security.GeneralSecurityException;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.Principal;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestSslUtils.CertificateBuilder;
+
+public class TestKeyManagerFactory extends KeyManagerFactorySpi {
+    public static final String ALGORITHM = "TestAlgorithm";
+
+    @Override
+    protected void engineInit(KeyStore keyStore, char[] chars) {
+
+    }
+
+    @Override
+    protected void engineInit(ManagerFactoryParameters managerFactoryParameters) {
+
+    }
+
+    @Override
+    protected KeyManager[] engineGetKeyManagers() {
+        return new KeyManager[] {new TestKeyManager()};
+    }
+
+    public static class TestKeyManager extends X509ExtendedKeyManager {
+
+        public static String mockTrustStoreFile;
+        public static final String ALIAS = "TestAlias";
+        private static final String CN = "localhost";
+        private static final String SIGNATURE_ALGORITHM = "RSA";
+        private KeyPair keyPair;
+        private X509Certificate certificate;
+
+        protected TestKeyManager() {
+            try {
+                this.keyPair = TestSslUtils.generateKeyPair(SIGNATURE_ALGORITHM);
+                CertificateBuilder certBuilder = new CertificateBuilder();
+                this.certificate = certBuilder.generate("CN=" + CN + ", O=A server", this.keyPair);
+                Map<String, X509Certificate> certificates = new HashMap<>();
+                certificates.put(ALIAS, certificate);
+                File trustStoreFile = File.createTempFile("testTrustStore", ".jks");
+                mockTrustStoreFile = trustStoreFile.getPath();
+                TestSslUtils.createTrustStore(mockTrustStoreFile, new Password(TestSslUtils.TRUST_STORE_PASSWORD), certificates);
+            } catch (IOException | GeneralSecurityException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public String[] getClientAliases(String s, Principal[] principals) {
+            return new String[] {ALIAS};
+        }
+
+        @Override
+        public String chooseClientAlias(String[] strings, Principal[] principals, Socket socket) {
+            return ALIAS;
+        }
+
+        @Override
+        public String[] getServerAliases(String s, Principal[] principals) {
+            return new String[] {ALIAS};
+        }
+
+        @Override
+        public String chooseServerAlias(String s, Principal[] principals, Socket socket) {
+            return ALIAS;
+        }
+
+        @Override
+        public X509Certificate[] getCertificateChain(String s) {
+            return new X509Certificate[] {this.certificate};
+        }
+
+        @Override
+        public PrivateKey getPrivateKey(String s) {
+            return this.keyPair.getPrivate();
+        }
+    }
+
+}
+
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
new file mode 100644
index 0000000..fb44d3c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl.mock;
+
+import java.security.Provider;
+
+public class TestProvider extends Provider {
+
+    private static final String KEY_MANAGER_FACTORY = String.format("KeyManagerFactory.%s", TestKeyManagerFactory.ALGORITHM);
+    private static final String TRUST_MANAGER_FACTORY = String.format("TrustManagerFactory.%s", TestTrustManagerFactory.ALGORITHM);
+
+    public TestProvider() {
+        this("TestProvider", 0.1, "provider for test cases");
+    }
+
+    protected TestProvider(String name, double version, String info) {
+        super(name, version, info);
+        super.put(KEY_MANAGER_FACTORY, TestKeyManagerFactory.class.getName());
+        super.put(TRUST_MANAGER_FACTORY, TestTrustManagerFactory.class.getName());
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java
new file mode 100644
index 0000000..4115a5f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl.mock;
+
+import javax.net.ssl.ManagerFactoryParameters;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactorySpi;
+import javax.net.ssl.X509ExtendedTrustManager;
+import java.net.Socket;
+import java.security.KeyStore;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+public class TestTrustManagerFactory extends TrustManagerFactorySpi {
+    public static final String ALGORITHM = "TestAlgorithm";
+
+    @Override
+    protected void engineInit(KeyStore keyStore) {
+
+    }
+
+    @Override
+    protected void engineInit(ManagerFactoryParameters managerFactoryParameters) {
+
+    }
+
+    @Override
+    protected TrustManager[] engineGetTrustManagers() {
+        return new TrustManager[] {new TestTrustManager()};
+    }
+
+    public static class TestTrustManager extends X509ExtendedTrustManager {
+
+        public static final String ALIAS = "TestAlias";
+
+        @Override
+        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+
+        }
+
+        @Override
+        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+
+        }
+
+        @Override
+        public X509Certificate[] getAcceptedIssuers() {
+            return new X509Certificate[0];
+        }
+
+        @Override
+        public void checkClientTrusted(X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {
+
+        }
+
+        @Override
+        public void checkServerTrusted(X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {
+
+        }
+
+        @Override
+        public void checkClientTrusted(X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {
+
+        }
+
+        @Override
+        public void checkServerTrusted(X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {
+
+        }
+    }
+
+}
+
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index b2de0e6..6dfceb2 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -70,6 +70,8 @@ import java.util.ArrayList;
 
 public class TestSslUtils {
 
+    public static final String TRUST_STORE_PASSWORD = "TrustStorePassword";
+
     /**
      * Create a self-signed X.509 Certificate.
      * From http://bfo.com/blog/2011/03/08/odds_and_ends_creating_a_new_x_509_certificate.html.
@@ -175,6 +177,20 @@ public class TestSslUtils {
         return sslConfigs;
     }
 
+    public static Map<String, Object> createSslConfig(String keyManagerAlgorithm, String trustManagerAlgorithm) {
+        Map<String, Object> sslConfigs = new HashMap<>();
+        sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
+
+        sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, keyManagerAlgorithm);
+        sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, trustManagerAlgorithm);
+
+        List<String> enabledProtocols  = new ArrayList<>();
+        enabledProtocols.add("TLSv1.2");
+        sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
+
+        return sslConfigs;
+    }
+
     public static  Map<String, Object> createSslConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias)
         throws IOException, GeneralSecurityException {
         return createSslConfig(useClientCert, trustStore, mode, trustStoreFile, certAlias, "localhost");
@@ -193,7 +209,7 @@ public class TestSslUtils {
         File keyStoreFile = null;
         Password password = mode == Mode.SERVER ? new Password("ServerPassword") : new Password("ClientPassword");
 
-        Password trustStorePassword = new Password("TrustStorePassword");
+        Password trustStorePassword = new Password(TRUST_STORE_PASSWORD);
 
         if (mode == Mode.CLIENT && useClientCert) {
             keyStoreFile = File.createTempFile("clientKS", ".jks");