You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2015/10/24 18:45:05 UTC
[2/4] kafka git commit: KAFKA-2460; Fix capitalisation in SSL classes
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
deleted file mode 100644
index ebb59b5..0000000
--- a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
+++ /dev/null
@@ -1,652 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.network;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.io.IOException;
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.config.SSLConfigs;
-import org.apache.kafka.common.security.ssl.SSLFactory;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestSSLUtils;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for the SSL transport layer. These use a test harness that runs a simple socket server that echos back responses.
- */
-
-public class SSLTransportLayerTest {
-
- private static final int BUFFER_SIZE = 4 * 1024;
-
- private SSLEchoServer server;
- private Selector selector;
- private ChannelBuilder channelBuilder;
- private CertStores serverCertStores;
- private CertStores clientCertStores;
- private Map<String, Object> sslClientConfigs;
- private Map<String, Object> sslServerConfigs;
-
- @Before
- public void setup() throws Exception {
- // Create certificates for use by client and server. Add server cert to client truststore and vice versa.
- serverCertStores = new CertStores(true);
- clientCertStores = new CertStores(false);
- sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
- sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
-
- this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
- this.channelBuilder.configure(sslClientConfigs);
- this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
- }
-
- @After
- public void teardown() throws Exception {
- if (selector != null)
- this.selector.close();
- if (server != null)
- this.server.close();
- }
-
- /**
- * Tests that server certificate with valid IP address is accepted by
- * a client that validates server endpoint.
- */
- @Test
- public void testValidEndpointIdentification() throws Exception {
- String node = "0";
- createEchoServer(sslServerConfigs);
- sslClientConfigs.put(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- testClientConnection(node, 100, 10);
- }
-
- /**
- * Tests that server certificate with invalid IP address is not accepted by
- * a client that validates server endpoint. Certificate uses "localhost" as
- * common name, test uses host IP to trigger endpoint validation failure.
- */
- @Test
- public void testInvalidEndpointIdentification() throws Exception {
- String node = "0";
- String serverHost = InetAddress.getLocalHost().getHostAddress();
- server = new SSLEchoServer(sslServerConfigs, serverHost);
- server.start();
- sslClientConfigs.put(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress(serverHost, server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- waitForChannelClose(node);
- }
-
- /**
- * Tests that server certificate with invalid IP address is accepted by
- * a client that has disabled endpoint validation
- */
- @Test
- public void testEndpointIdentificationDisabled() throws Exception {
- String node = "0";
- String serverHost = InetAddress.getLocalHost().getHostAddress();
- server = new SSLEchoServer(sslServerConfigs, serverHost);
- server.start();
- sslClientConfigs.remove(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress(serverHost, server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- testClientConnection(node, 100, 10);
- }
-
- /**
- * Tests that server accepts connections from clients with a trusted certificate
- * when client authentication is required.
- */
- @Test
- public void testClientAuthenticationRequiredValidProvided() throws Exception {
- String node = "0";
- sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
- createEchoServer(sslServerConfigs);
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- testClientConnection(node, 100, 10);
- }
-
- /**
- * Tests that server does not accept connections from clients with an untrusted certificate
- * when client authentication is required.
- */
- @Test
- public void testClientAuthenticationRequiredUntrustedProvided() throws Exception {
- String node = "0";
- sslServerConfigs = serverCertStores.getUntrustingConfig();
- sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
- createEchoServer(sslServerConfigs);
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- waitForChannelClose(node);
- }
-
- /**
- * Tests that server does not accept connections from clients which dont
- * provide a certificate when client authentication is required.
- */
- @Test
- public void testClientAuthenticationRequiredNotProvided() throws Exception {
- String node = "0";
- sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
- createEchoServer(sslServerConfigs);
-
- sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
- sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
- sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG);
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- waitForChannelClose(node);
- }
-
- /**
- * Tests that server accepts connections from a client configured
- * with an untrusted certificate if client authentication is disabled
- */
- @Test
- public void testClientAuthenticationDisabledUntrustedProvided() throws Exception {
- String node = "0";
- sslServerConfigs = serverCertStores.getUntrustingConfig();
- sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
- createEchoServer(sslServerConfigs);
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- testClientConnection(node, 100, 10);
- }
-
- /**
- * Tests that server accepts connections from a client that does not provide
- * a certificate if client authentication is disabled
- */
- @Test
- public void testClientAuthenticationDisabledNotProvided() throws Exception {
- String node = "0";
- sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
- createEchoServer(sslServerConfigs);
-
- sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
- sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
- sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG);
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- testClientConnection(node, 100, 10);
- }
-
- /**
- * Tests that server accepts connections from a client configured
- * with a valid certificate if client authentication is requested
- */
- @Test
- public void testClientAuthenticationRequestedValidProvided() throws Exception {
- String node = "0";
- sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
- createEchoServer(sslServerConfigs);
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- testClientConnection(node, 100, 10);
- }
-
- /**
- * Tests that server accepts connections from a client that does not provide
- * a certificate if client authentication is requested but not required
- */
- @Test
- public void testClientAuthenticationRequestedNotProvided() throws Exception {
- String node = "0";
- sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
- createEchoServer(sslServerConfigs);
-
- sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
- sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
- sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG);
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- testClientConnection(node, 100, 10);
- }
-
- /**
- * Tests that channels cannot be created if truststore cannot be loaded
- */
- @Test
- public void testInvalidTruststorePassword() throws Exception {
- SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
- try {
- sslClientConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid");
- channelBuilder.configure(sslClientConfigs);
- fail("SSL channel configured with invalid truststore password");
- } catch (KafkaException e) {
- // Expected exception
- }
- }
-
- /**
- * Tests that channels cannot be created if keystore cannot be loaded
- */
- @Test
- public void testInvalidKeystorePassword() throws Exception {
- SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
- try {
- sslClientConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid");
- channelBuilder.configure(sslClientConfigs);
- fail("SSL channel configured with invalid keystore password");
- } catch (KafkaException e) {
- // Expected exception
- }
- }
-
- /**
- * Tests that client connections cannot be created to a server
- * if key password is invalid
- */
- @Test
- public void testInvalidKeyPassword() throws Exception {
- String node = "0";
- sslServerConfigs.put(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, "invalid");
- createEchoServer(sslServerConfigs);
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- waitForChannelClose(node);
- }
-
- /**
- * Tests that connections cannot be made with unsupported TLS versions
- */
- @Test
- public void testUnsupportedTLSVersion() throws Exception {
- String node = "0";
- sslServerConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2"));
- createEchoServer(sslServerConfigs);
-
- sslClientConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1"));
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- waitForChannelClose(node);
- }
-
- /**
- * Tests that connections cannot be made with unsupported TLS cipher suites
- */
- @Test
- public void testUnsupportedCiphers() throws Exception {
- String node = "0";
- String[] cipherSuites = SSLContext.getDefault().getDefaultSSLParameters().getCipherSuites();
- sslServerConfigs.put(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0]));
- createEchoServer(sslServerConfigs);
-
- sslClientConfigs.put(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1]));
- createSelector(sslClientConfigs);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- waitForChannelClose(node);
- }
-
- /**
- * Tests handling of BUFFER_UNDERFLOW during unwrap when network read buffer is smaller than SSL session packet buffer size.
- */
- @Test
- public void testNetReadBufferResize() throws Exception {
- String node = "0";
- createEchoServer(sslServerConfigs);
- createSelector(sslClientConfigs, 10, null, null);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- testClientConnection(node, 64000, 10);
- }
-
- /**
- * Tests handling of BUFFER_OVERFLOW during wrap when network write buffer is smaller than SSL session packet buffer size.
- */
- @Test
- public void testNetWriteBufferResize() throws Exception {
- String node = "0";
- createEchoServer(sslServerConfigs);
- createSelector(sslClientConfigs, null, 10, null);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- testClientConnection(node, 64000, 10);
- }
-
- /**
- * Tests handling of BUFFER_OVERFLOW during unwrap when application read buffer is smaller than SSL session application buffer size.
- */
- @Test
- public void testApplicationBufferResize() throws Exception {
- String node = "0";
- createEchoServer(sslServerConfigs);
- createSelector(sslClientConfigs, null, null, 10);
- InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
- selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
- testClientConnection(node, 64000, 10);
- }
-
- private void testClientConnection(String node, int minMessageSize, int messageCount) throws Exception {
-
- String prefix = TestUtils.randomString(minMessageSize);
- int requests = 0;
- int responses = 0;
- // wait for handshake to finish
- while (!selector.isChannelReady(node)) {
- selector.poll(1000L);
- }
- selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-0").getBytes())));
- requests++;
- while (responses < messageCount) {
- selector.poll(0L);
- assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
-
- for (NetworkReceive receive : selector.completedReceives()) {
- assertEquals(prefix + "-" + responses, new String(Utils.toArray(receive.payload())));
- responses++;
- }
-
- for (int i = 0; i < selector.completedSends().size() && requests < messageCount && selector.isChannelReady(node); i++, requests++) {
- selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-" + requests).getBytes())));
- }
- }
- }
-
- private void waitForChannelClose(String node) throws IOException {
- boolean closed = false;
- for (int i = 0; i < 30; i++) {
- selector.poll(1000L);
- if (selector.channel(node) == null) {
- closed = true;
- break;
- }
- }
- assertTrue(closed);
- }
-
- private void createEchoServer(Map<String, Object> sslServerConfigs) throws Exception {
- server = new SSLEchoServer(sslServerConfigs, "localhost");
- server.start();
- }
-
- private void createSelector(Map<String, Object> sslClientConfigs) {
- createSelector(sslClientConfigs, null, null, null);
- }
-
- private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize, final Integer netWriteBufSize, final Integer appBufSize) {
-
- this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT) {
-
- @Override
- protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException {
- SocketChannel socketChannel = (SocketChannel) key.channel();
- SSLEngine sslEngine = sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
- socketChannel.socket().getPort());
- TestSSLTransportLayer transportLayer = new TestSSLTransportLayer(id, key, sslEngine, netReadBufSize, netWriteBufSize, appBufSize);
- transportLayer.startHandshake();
- return transportLayer;
- }
-
-
- };
- this.channelBuilder.configure(sslClientConfigs);
- this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
- }
-
- private static class CertStores {
-
- Map<String, Object> sslConfig;
-
- CertStores(boolean server) throws Exception {
- String name = server ? "server" : "client";
- Mode mode = server ? Mode.SERVER : Mode.CLIENT;
- File truststoreFile = File.createTempFile(name + "TS", ".jks");
- sslConfig = TestSSLUtils.createSSLConfig(!server, true, mode, truststoreFile, name);
- sslConfig.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
- }
-
- private Map<String, Object> getTrustingConfig(CertStores truststoreConfig) {
- Map<String, Object> config = new HashMap<String, Object>(sslConfig);
- config.put(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
- config.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
- config.put(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
- return config;
- }
-
- private Map<String, Object> getUntrustingConfig() {
- return sslConfig;
- }
- }
-
- /**
- * SSLTransportLayer with overrides for packet and application buffer size to test buffer resize
- * code path. The overridden buffer size starts with a small value and increases in size when the buffer
- * size is retrieved to handle overflow/underflow, until the actual session buffer size is reached.
- */
- private static class TestSSLTransportLayer extends SSLTransportLayer {
-
- private final ResizeableBufferSize netReadBufSize;
- private final ResizeableBufferSize netWriteBufSize;
- private final ResizeableBufferSize appBufSize;
-
- public TestSSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine,
- Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) throws IOException {
- super(channelId, key, sslEngine, false);
- this.netReadBufSize = new ResizeableBufferSize(netReadBufSize);
- this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSize);
- this.appBufSize = new ResizeableBufferSize(appBufSize);
- }
-
- @Override
- protected int netReadBufferSize() {
- ByteBuffer netReadBuffer = netReadBuffer();
- // netReadBufferSize() is invoked in SSLTransportLayer.read() prior to the read
- // operation. To avoid the read buffer being expanded too early, increase buffer size
- // only when read buffer is full. This ensures that BUFFER_UNDERFLOW is always
- // triggered in testNetReadBufferResize().
- boolean updateBufSize = netReadBuffer != null && !netReadBuffer().hasRemaining();
- return netReadBufSize.updateAndGet(super.netReadBufferSize(), updateBufSize);
- }
-
- @Override
- protected int netWriteBufferSize() {
- return netWriteBufSize.updateAndGet(super.netWriteBufferSize(), true);
- }
-
- @Override
- protected int applicationBufferSize() {
- return appBufSize.updateAndGet(super.applicationBufferSize(), true);
- }
-
- private static class ResizeableBufferSize {
- private Integer bufSizeOverride;
- ResizeableBufferSize(Integer bufSizeOverride) {
- this.bufSizeOverride = bufSizeOverride;
- }
- int updateAndGet(int actualSize, boolean update) {
- int size = actualSize;
- if (bufSizeOverride != null) {
- if (update)
- bufSizeOverride = Math.min(bufSizeOverride * 2, size);
- size = bufSizeOverride;
- }
- return size;
- }
- }
- }
-
- // Non-blocking EchoServer implementation that uses SSLTransportLayer
- private class SSLEchoServer extends Thread {
- private final int port;
- private final ServerSocketChannel serverSocketChannel;
- private final List<SocketChannel> newChannels;
- private final List<SocketChannel> socketChannels;
- private final AcceptorThread acceptorThread;
- private SSLFactory sslFactory;
- private final Selector selector;
- private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>();
-
- public SSLEchoServer(Map<String, ?> configs, String serverHost) throws Exception {
- this.sslFactory = new SSLFactory(Mode.SERVER);
- this.sslFactory.configure(configs);
- serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
- this.port = serverSocketChannel.socket().getLocalPort();
- this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
- this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
- SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.SERVER);
- channelBuilder.configure(sslServerConfigs);
- this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
- setName("echoserver");
- setDaemon(true);
- acceptorThread = new AcceptorThread();
- }
-
- @Override
- public void run() {
- try {
- acceptorThread.start();
- while (serverSocketChannel.isOpen()) {
- selector.poll(1000);
- for (SocketChannel socketChannel : newChannels) {
- String id = id(socketChannel);
- selector.register(id, socketChannel);
- socketChannels.add(socketChannel);
- }
- newChannels.clear();
- while (true) {
- NetworkSend send = inflightSends.peek();
- if (send != null && !selector.channel(send.destination()).hasSend()) {
- send = inflightSends.poll();
- selector.send(send);
- } else
- break;
- }
- List<NetworkReceive> completedReceives = selector.completedReceives();
- for (NetworkReceive rcv : completedReceives) {
- NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
- if (!selector.channel(send.destination()).hasSend())
- selector.send(send);
- else
- inflightSends.add(send);
- }
- }
- } catch (IOException e) {
- // ignore
- }
- }
-
- private String id(SocketChannel channel) {
- return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort() + "-" +
- channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();
- }
-
- public void closeConnections() throws IOException {
- for (SocketChannel channel : socketChannels)
- channel.close();
- socketChannels.clear();
- }
-
- public void close() throws IOException, InterruptedException {
- this.serverSocketChannel.close();
- closeConnections();
- acceptorThread.interrupt();
- acceptorThread.join();
- interrupt();
- join();
- }
-
- private class AcceptorThread extends Thread {
- public AcceptorThread() throws IOException {
- setName("acceptor");
- }
- public void run() {
- try {
-
- java.nio.channels.Selector acceptSelector = java.nio.channels.Selector.open();
- serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
- while (serverSocketChannel.isOpen()) {
- if (acceptSelector.select(1000) > 0) {
- Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator();
- while (it.hasNext()) {
- SelectionKey key = it.next();
- if (key.isAcceptable()) {
- SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
- socketChannel.configureBlocking(false);
- newChannels.add(socketChannel);
- selector.wakeup();
- }
- }
- }
- }
- } catch (IOException e) {
- // ignore
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 683eeee..8ce0298 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -49,7 +49,7 @@ public class SelectorTest {
@Before
public void setup() throws Exception {
Map<String, Object> configs = new HashMap<String, Object>();
- configs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+ configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
this.server = new EchoServer(configs);
this.server.start();
this.time = new MockTime();
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
new file mode 100644
index 0000000..94c5654
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestSslUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses.
+ */
+public class SslSelectorTest extends SelectorTest {
+
+ private Metrics metrics;
+ private Map<String, Object> sslClientConfigs;
+
+ @Before
+ public void setup() throws Exception {
+ File trustStoreFile = File.createTempFile("truststore", ".jks");
+
+ Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
+ sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+ this.server = new EchoServer(sslServerConfigs);
+ this.server.start();
+ this.time = new MockTime();
+ sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.SERVER, trustStoreFile, "client");
+ sslClientConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+
+ this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+ this.channelBuilder.configure(sslClientConfigs);
+ this.metrics = new Metrics();
+ this.selector = new Selector(5000, metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ this.selector.close();
+ this.server.close();
+ this.metrics.close();
+ }
+
+ /**
+ * Tests that SSL renegotiation initiated by the server are handled correctly by the client
+ * @throws Exception
+ */
+ @Test
+ public void testRenegotiation() throws Exception {
+ ChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT) {
+ @Override
+ protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException {
+ SocketChannel socketChannel = (SocketChannel) key.channel();
+ SslTransportLayer transportLayer = new SslTransportLayer(id, key,
+ sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()),
+ true);
+ transportLayer.startHandshake();
+ return transportLayer;
+ }
+ };
+ channelBuilder.configure(sslClientConfigs);
+ Selector selector = new Selector(5000, metrics, time, "MetricGroup2", new LinkedHashMap<String, String>(), channelBuilder);
+ try {
+ int reqs = 500;
+ String node = "0";
+ // create connections
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ // send echo requests and receive responses
+ int requests = 0;
+ int responses = 0;
+ int renegotiates = 0;
+ while (!selector.isChannelReady(node)) {
+ selector.poll(1000L);
+ }
+ selector.send(createSend(node, node + "-" + 0));
+ requests++;
+
+ // loop until we complete all requests
+ while (responses < reqs) {
+ selector.poll(0L);
+ if (responses >= 100 && renegotiates == 0) {
+ renegotiates++;
+ server.renegotiate();
+ }
+ assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
+
+ // handle any responses we may have gotten
+ for (NetworkReceive receive : selector.completedReceives()) {
+ String[] pieces = asString(receive).split("-");
+ assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
+ assertEquals("Check the source", receive.source(), pieces[0]);
+ assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
+ assertEquals("Check the request counter", responses, Integer.parseInt(pieces[1]));
+ responses++;
+ }
+
+ // prepare new sends for the next round
+ for (int i = 0; i < selector.completedSends().size() && requests < reqs && selector.isChannelReady(node); i++, requests++) {
+ selector.send(createSend(node, node + "-" + requests));
+ }
+ }
+ } finally {
+ selector.close();
+ }
+ }
+
+ @Test
+ public void testDisabledRenegotiation() throws Exception {
+ String node = "0";
+ // create connections
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ // send echo requests and receive responses
+ while (!selector.isChannelReady(node)) {
+ selector.poll(1000L);
+ }
+ selector.send(createSend(node, node + "-" + 0));
+ selector.poll(0L);
+ server.renegotiate();
+ selector.send(createSend(node, node + "-" + 1));
+ long expiryTime = System.currentTimeMillis() + 2000;
+
+ List<String> disconnected = new ArrayList<>();
+ while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) {
+ selector.poll(10);
+ disconnected.addAll(selector.disconnected());
+ }
+ assertTrue("Renegotiation should cause disconnection", disconnected.contains(node));
+
+ }
+
+ /**
+ * Connects and waits for handshake to complete. This is required since SSLTransportLayer
+ * implementation requires the channel to be ready before send is invoked (unlike plaintext
+ * where send can be invoked straight after connect)
+ */
+ protected void connect(String node, InetSocketAddress serverAddr) throws IOException {
+ blockingConnect(node, serverAddr);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
new file mode 100644
index 0000000..91bd47c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -0,0 +1,651 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.io.IOException;
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the SSL transport layer. These use a test harness that runs a simple socket server that echos back responses.
+ */
+public class SslTransportLayerTest {
+
+ private static final int BUFFER_SIZE = 4 * 1024;
+
+ private SslEchoServer server;
+ private Selector selector;
+ private ChannelBuilder channelBuilder;
+ private CertStores serverCertStores;
+ private CertStores clientCertStores;
+ private Map<String, Object> sslClientConfigs;
+ private Map<String, Object> sslServerConfigs;
+
+ @Before
+ public void setup() throws Exception {
+ // Create certificates for use by client and server. Add server cert to client truststore and vice versa.
+ serverCertStores = new CertStores(true);
+ clientCertStores = new CertStores(false);
+ sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
+ sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
+
+ this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+ this.channelBuilder.configure(sslClientConfigs);
+ this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (selector != null)
+ this.selector.close();
+ if (server != null)
+ this.server.close();
+ }
+
+ /**
+ * Tests that server certificate with valid IP address is accepted by
+ * a client that validates server endpoint.
+ */
+ @Test
+ public void testValidEndpointIdentification() throws Exception {
+ String node = "0";
+ createEchoServer(sslServerConfigs);
+ sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ testClientConnection(node, 100, 10);
+ }
+
+ /**
+ * Tests that server certificate with invalid IP address is not accepted by
+ * a client that validates server endpoint. Certificate uses "localhost" as
+ * common name, test uses host IP to trigger endpoint validation failure.
+ */
+ @Test
+ public void testInvalidEndpointIdentification() throws Exception {
+ String node = "0";
+ String serverHost = InetAddress.getLocalHost().getHostAddress();
+ server = new SslEchoServer(sslServerConfigs, serverHost);
+ server.start();
+ sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress(serverHost, server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ waitForChannelClose(node);
+ }
+
+ /**
+ * Tests that server certificate with invalid IP address is accepted by
+ * a client that has disabled endpoint validation
+ */
+ @Test
+ public void testEndpointIdentificationDisabled() throws Exception {
+ String node = "0";
+ String serverHost = InetAddress.getLocalHost().getHostAddress();
+ server = new SslEchoServer(sslServerConfigs, serverHost);
+ server.start();
+ sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress(serverHost, server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ testClientConnection(node, 100, 10);
+ }
+
+ /**
+ * Tests that server accepts connections from clients with a trusted certificate
+ * when client authentication is required.
+ */
+ @Test
+ public void testClientAuthenticationRequiredValidProvided() throws Exception {
+ String node = "0";
+ sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+ createEchoServer(sslServerConfigs);
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ testClientConnection(node, 100, 10);
+ }
+
+ /**
+ * Tests that server does not accept connections from clients with an untrusted certificate
+ * when client authentication is required.
+ */
+ @Test
+ public void testClientAuthenticationRequiredUntrustedProvided() throws Exception {
+ String node = "0";
+ sslServerConfigs = serverCertStores.getUntrustingConfig();
+ sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+ createEchoServer(sslServerConfigs);
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ waitForChannelClose(node);
+ }
+
+ /**
+ * Tests that server does not accept connections from clients which dont
+ * provide a certificate when client authentication is required.
+ */
+ @Test
+ public void testClientAuthenticationRequiredNotProvided() throws Exception {
+ String node = "0";
+ sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+ createEchoServer(sslServerConfigs);
+
+ sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
+ sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
+ sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ waitForChannelClose(node);
+ }
+
+ /**
+ * Tests that server accepts connections from a client configured
+ * with an untrusted certificate if client authentication is disabled
+ */
+ @Test
+ public void testClientAuthenticationDisabledUntrustedProvided() throws Exception {
+ String node = "0";
+ sslServerConfigs = serverCertStores.getUntrustingConfig();
+ sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
+ createEchoServer(sslServerConfigs);
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ testClientConnection(node, 100, 10);
+ }
+
+ /**
+ * Tests that server accepts connections from a client that does not provide
+ * a certificate if client authentication is disabled
+ */
+ @Test
+ public void testClientAuthenticationDisabledNotProvided() throws Exception {
+ String node = "0";
+ sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
+ createEchoServer(sslServerConfigs);
+
+ sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
+ sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
+ sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ testClientConnection(node, 100, 10);
+ }
+
+ /**
+ * Tests that server accepts connections from a client configured
+ * with a valid certificate if client authentication is requested
+ */
+ @Test
+ public void testClientAuthenticationRequestedValidProvided() throws Exception {
+ String node = "0";
+ sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
+ createEchoServer(sslServerConfigs);
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ testClientConnection(node, 100, 10);
+ }
+
+ /**
+ * Tests that server accepts connections from a client that does not provide
+ * a certificate if client authentication is requested but not required
+ */
+ @Test
+ public void testClientAuthenticationRequestedNotProvided() throws Exception {
+ String node = "0";
+ sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
+ createEchoServer(sslServerConfigs);
+
+ sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
+ sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
+ sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ testClientConnection(node, 100, 10);
+ }
+
+ /**
+ * Tests that channels cannot be created if truststore cannot be loaded
+ */
+ @Test
+ public void testInvalidTruststorePassword() throws Exception {
+ SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+ try {
+ sslClientConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid");
+ channelBuilder.configure(sslClientConfigs);
+ fail("SSL channel configured with invalid truststore password");
+ } catch (KafkaException e) {
+ // Expected exception
+ }
+ }
+
+ /**
+ * Tests that channels cannot be created if keystore cannot be loaded
+ */
+ @Test
+ public void testInvalidKeystorePassword() throws Exception {
+ SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+ try {
+ sslClientConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid");
+ channelBuilder.configure(sslClientConfigs);
+ fail("SSL channel configured with invalid keystore password");
+ } catch (KafkaException e) {
+ // Expected exception
+ }
+ }
+
+ /**
+ * Tests that client connections cannot be created to a server
+ * if key password is invalid
+ */
+ @Test
+ public void testInvalidKeyPassword() throws Exception {
+ String node = "0";
+ sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "invalid");
+ createEchoServer(sslServerConfigs);
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ waitForChannelClose(node);
+ }
+
+ /**
+ * Tests that connections cannot be made with unsupported TLS versions
+ */
+ @Test
+ public void testUnsupportedTLSVersion() throws Exception {
+ String node = "0";
+ sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2"));
+ createEchoServer(sslServerConfigs);
+
+ sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1"));
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ waitForChannelClose(node);
+ }
+
+ /**
+ * Tests that connections cannot be made with unsupported TLS cipher suites
+ */
+ @Test
+ public void testUnsupportedCiphers() throws Exception {
+ String node = "0";
+ String[] cipherSuites = SSLContext.getDefault().getDefaultSSLParameters().getCipherSuites();
+ sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0]));
+ createEchoServer(sslServerConfigs);
+
+ sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1]));
+ createSelector(sslClientConfigs);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ waitForChannelClose(node);
+ }
+
+ /**
+ * Tests handling of BUFFER_UNDERFLOW during unwrap when network read buffer is smaller than SSL session packet buffer size.
+ */
+ @Test
+ public void testNetReadBufferResize() throws Exception {
+ String node = "0";
+ createEchoServer(sslServerConfigs);
+ createSelector(sslClientConfigs, 10, null, null);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ testClientConnection(node, 64000, 10);
+ }
+
+ /**
+ * Tests handling of BUFFER_OVERFLOW during wrap when network write buffer is smaller than SSL session packet buffer size.
+ */
+ @Test
+ public void testNetWriteBufferResize() throws Exception {
+ String node = "0";
+ createEchoServer(sslServerConfigs);
+ createSelector(sslClientConfigs, null, 10, null);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ testClientConnection(node, 64000, 10);
+ }
+
+ /**
+ * Tests handling of BUFFER_OVERFLOW during unwrap when application read buffer is smaller than SSL session application buffer size.
+ */
+ @Test
+ public void testApplicationBufferResize() throws Exception {
+ String node = "0";
+ createEchoServer(sslServerConfigs);
+ createSelector(sslClientConfigs, null, null, 10);
+ InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+ selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+ testClientConnection(node, 64000, 10);
+ }
+
+ private void testClientConnection(String node, int minMessageSize, int messageCount) throws Exception {
+
+ String prefix = TestUtils.randomString(minMessageSize);
+ int requests = 0;
+ int responses = 0;
+ // wait for handshake to finish
+ while (!selector.isChannelReady(node)) {
+ selector.poll(1000L);
+ }
+ selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-0").getBytes())));
+ requests++;
+ while (responses < messageCount) {
+ selector.poll(0L);
+ assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
+
+ for (NetworkReceive receive : selector.completedReceives()) {
+ assertEquals(prefix + "-" + responses, new String(Utils.toArray(receive.payload())));
+ responses++;
+ }
+
+ for (int i = 0; i < selector.completedSends().size() && requests < messageCount && selector.isChannelReady(node); i++, requests++) {
+ selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-" + requests).getBytes())));
+ }
+ }
+ }
+
+ private void waitForChannelClose(String node) throws IOException {
+ boolean closed = false;
+ for (int i = 0; i < 30; i++) {
+ selector.poll(1000L);
+ if (selector.channel(node) == null) {
+ closed = true;
+ break;
+ }
+ }
+ assertTrue(closed);
+ }
+
+ private void createEchoServer(Map<String, Object> sslServerConfigs) throws Exception {
+ server = new SslEchoServer(sslServerConfigs, "localhost");
+ server.start();
+ }
+
+ private void createSelector(Map<String, Object> sslClientConfigs) {
+ createSelector(sslClientConfigs, null, null, null);
+ }
+
+ private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize, final Integer netWriteBufSize, final Integer appBufSize) {
+
+ this.channelBuilder = new SslChannelBuilder(Mode.CLIENT) {
+
+ @Override
+ protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException {
+ SocketChannel socketChannel = (SocketChannel) key.channel();
+ SSLEngine sslEngine = sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
+ socketChannel.socket().getPort());
+ TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine, netReadBufSize, netWriteBufSize, appBufSize);
+ transportLayer.startHandshake();
+ return transportLayer;
+ }
+
+
+ };
+ this.channelBuilder.configure(sslClientConfigs);
+ this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+ }
+
+ private static class CertStores {
+
+ Map<String, Object> sslConfig;
+
+ CertStores(boolean server) throws Exception {
+ String name = server ? "server" : "client";
+ Mode mode = server ? Mode.SERVER : Mode.CLIENT;
+ File truststoreFile = File.createTempFile(name + "TS", ".jks");
+ sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name);
+ sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+ }
+
+ private Map<String, Object> getTrustingConfig(CertStores truststoreConfig) {
+ Map<String, Object> config = new HashMap<String, Object>(sslConfig);
+ config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+ config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+ config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
+ return config;
+ }
+
+ private Map<String, Object> getUntrustingConfig() {
+ return sslConfig;
+ }
+ }
+
+ /**
+ * SSLTransportLayer with overrides for packet and application buffer size to test buffer resize
+ * code path. The overridden buffer size starts with a small value and increases in size when the buffer
+ * size is retrieved to handle overflow/underflow, until the actual session buffer size is reached.
+ */
+ private static class TestSslTransportLayer extends SslTransportLayer {
+
+ private final ResizeableBufferSize netReadBufSize;
+ private final ResizeableBufferSize netWriteBufSize;
+ private final ResizeableBufferSize appBufSize;
+
+ public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine,
+ Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) throws IOException {
+ super(channelId, key, sslEngine, false);
+ this.netReadBufSize = new ResizeableBufferSize(netReadBufSize);
+ this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSize);
+ this.appBufSize = new ResizeableBufferSize(appBufSize);
+ }
+
+ @Override
+ protected int netReadBufferSize() {
+ ByteBuffer netReadBuffer = netReadBuffer();
+ // netReadBufferSize() is invoked in SSLTransportLayer.read() prior to the read
+ // operation. To avoid the read buffer being expanded too early, increase buffer size
+ // only when read buffer is full. This ensures that BUFFER_UNDERFLOW is always
+ // triggered in testNetReadBufferResize().
+ boolean updateBufSize = netReadBuffer != null && !netReadBuffer().hasRemaining();
+ return netReadBufSize.updateAndGet(super.netReadBufferSize(), updateBufSize);
+ }
+
+ @Override
+ protected int netWriteBufferSize() {
+ return netWriteBufSize.updateAndGet(super.netWriteBufferSize(), true);
+ }
+
+ @Override
+ protected int applicationBufferSize() {
+ return appBufSize.updateAndGet(super.applicationBufferSize(), true);
+ }
+
+ private static class ResizeableBufferSize {
+ private Integer bufSizeOverride;
+ ResizeableBufferSize(Integer bufSizeOverride) {
+ this.bufSizeOverride = bufSizeOverride;
+ }
+ int updateAndGet(int actualSize, boolean update) {
+ int size = actualSize;
+ if (bufSizeOverride != null) {
+ if (update)
+ bufSizeOverride = Math.min(bufSizeOverride * 2, size);
+ size = bufSizeOverride;
+ }
+ return size;
+ }
+ }
+ }
+
+ // Non-blocking EchoServer implementation that uses SSLTransportLayer
+ private class SslEchoServer extends Thread {
+ private final int port;
+ private final ServerSocketChannel serverSocketChannel;
+ private final List<SocketChannel> newChannels;
+ private final List<SocketChannel> socketChannels;
+ private final AcceptorThread acceptorThread;
+ private SslFactory sslFactory;
+ private final Selector selector;
+ private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>();
+
+ public SslEchoServer(Map<String, ?> configs, String serverHost) throws Exception {
+ this.sslFactory = new SslFactory(Mode.SERVER);
+ this.sslFactory.configure(configs);
+ serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+ serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
+ this.port = serverSocketChannel.socket().getLocalPort();
+ this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
+ this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
+ SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.SERVER);
+ channelBuilder.configure(sslServerConfigs);
+ this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+ setName("echoserver");
+ setDaemon(true);
+ acceptorThread = new AcceptorThread();
+ }
+
+ @Override
+ public void run() {
+ try {
+ acceptorThread.start();
+ while (serverSocketChannel.isOpen()) {
+ selector.poll(1000);
+ for (SocketChannel socketChannel : newChannels) {
+ String id = id(socketChannel);
+ selector.register(id, socketChannel);
+ socketChannels.add(socketChannel);
+ }
+ newChannels.clear();
+ while (true) {
+ NetworkSend send = inflightSends.peek();
+ if (send != null && !selector.channel(send.destination()).hasSend()) {
+ send = inflightSends.poll();
+ selector.send(send);
+ } else
+ break;
+ }
+ List<NetworkReceive> completedReceives = selector.completedReceives();
+ for (NetworkReceive rcv : completedReceives) {
+ NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
+ if (!selector.channel(send.destination()).hasSend())
+ selector.send(send);
+ else
+ inflightSends.add(send);
+ }
+ }
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ private String id(SocketChannel channel) {
+ return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort() + "-" +
+ channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();
+ }
+
+ public void closeConnections() throws IOException {
+ for (SocketChannel channel : socketChannels)
+ channel.close();
+ socketChannels.clear();
+ }
+
+ public void close() throws IOException, InterruptedException {
+ this.serverSocketChannel.close();
+ closeConnections();
+ acceptorThread.interrupt();
+ acceptorThread.join();
+ interrupt();
+ join();
+ }
+
+ private class AcceptorThread extends Thread {
+ public AcceptorThread() throws IOException {
+ setName("acceptor");
+ }
+ public void run() {
+ try {
+
+ java.nio.channels.Selector acceptSelector = java.nio.channels.Selector.open();
+ serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
+ while (serverSocketChannel.isOpen()) {
+ if (acceptSelector.select(1000) > 0) {
+ Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator();
+ while (it.hasNext()) {
+ SelectionKey key = it.next();
+ if (key.isAcceptable()) {
+ SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
+ socketChannel.configureBlocking(false);
+ newChannels.add(socketChannel);
+ selector.wakeup();
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
deleted file mode 100644
index e90ec2b..0000000
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.security.ssl;
-
-import javax.net.ssl.*;
-
-import java.io.File;
-import java.util.Map;
-
-import org.apache.kafka.test.TestSSLUtils;
-import org.apache.kafka.common.network.Mode;
-
-import org.junit.Test;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
- */
-
-public class SSLFactoryTest {
-
- @Test
- public void testSSLFactoryConfiguration() throws Exception {
- File trustStoreFile = File.createTempFile("truststore", ".jks");
- Map<String, Object> serverSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.SERVER, trustStoreFile, "server");
- SSLFactory sslFactory = new SSLFactory(Mode.SERVER);
- sslFactory.configure(serverSSLConfig);
- //host and port are hints
- SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);
- assertNotNull(engine);
- String[] expectedProtocols = {"TLSv1.2"};
- assertArrayEquals(expectedProtocols, engine.getEnabledProtocols());
- assertEquals(false, engine.getUseClientMode());
- }
-
- @Test
- public void testClientMode() throws Exception {
- File trustStoreFile = File.createTempFile("truststore", ".jks");
- Map<String, Object> clientSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.CLIENT, trustStoreFile, "client");
- SSLFactory sslFactory = new SSLFactory(Mode.CLIENT);
- sslFactory.configure(clientSSLConfig);
- //host and port are hints
- SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);
- assertTrue(engine.getUseClientMode());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
new file mode 100644
index 0000000..b5710aa
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import javax.net.ssl.*;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.common.network.Mode;
+
+import org.junit.Test;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
+ */
+
+public class SslFactoryTest {
+
+ @Test
+ public void testSslFactoryConfiguration() throws Exception {
+ File trustStoreFile = File.createTempFile("truststore", ".jks");
+ Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
+ SslFactory sslFactory = new SslFactory(Mode.SERVER);
+ sslFactory.configure(serverSslConfig);
+ //host and port are hints
+ SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
+ assertNotNull(engine);
+ String[] expectedProtocols = {"TLSv1.2"};
+ assertArrayEquals(expectedProtocols, engine.getEnabledProtocols());
+ assertEquals(false, engine.getUseClientMode());
+ }
+
+ @Test
+ public void testClientMode() throws Exception {
+ File trustStoreFile = File.createTempFile("truststore", ".jks");
+ Map<String, Object> clientSslConfig = TestSslUtils.createSslConfig(false, true, Mode.CLIENT, trustStoreFile, "client");
+ SslFactory sslFactory = new SslFactory(Mode.CLIENT);
+ sslFactory.configure(clientSslConfig);
+ //host and port are hints
+ SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
+ assertTrue(engine.getUseClientMode());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
deleted file mode 100644
index b231692..0000000
--- a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.config.SSLConfigs;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.clients.CommonClientConfigs;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.EOFException;
-import java.math.BigInteger;
-import javax.net.ssl.TrustManagerFactory;
-import java.security.*;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-
-import org.bouncycastle.asn1.x500.X500Name;
-import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
-import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
-import org.bouncycastle.cert.X509CertificateHolder;
-import org.bouncycastle.cert.X509v1CertificateBuilder;
-import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
-import org.bouncycastle.crypto.params.AsymmetricKeyParameter;
-import org.bouncycastle.crypto.util.PrivateKeyFactory;
-import org.bouncycastle.jce.provider.BouncyCastleProvider;
-import org.bouncycastle.operator.ContentSigner;
-import org.bouncycastle.operator.DefaultDigestAlgorithmIdentifierFinder;
-import org.bouncycastle.operator.DefaultSignatureAlgorithmIdentifierFinder;
-import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.ArrayList;
-
-
-public class TestSSLUtils {
-
- /**
- * Create a self-signed X.509 Certificate.
- * From http://bfo.com/blog/2011/03/08/odds_and_ends_creating_a_new_x_509_certificate.html.
- *
- * @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB"
- * @param pair the KeyPair
- * @param days how many days from now the Certificate is valid for
- * @param algorithm the signing algorithm, eg "SHA1withRSA"
- * @return the self-signed certificate
- * @throws CertificateException thrown if a security error or an IO error ocurred.
- */
- public static X509Certificate generateCertificate(String dn, KeyPair pair,
- int days, String algorithm)
- throws CertificateException {
-
- try {
- Security.addProvider(new BouncyCastleProvider());
- AlgorithmIdentifier sigAlgId = new DefaultSignatureAlgorithmIdentifierFinder().find(algorithm);
- AlgorithmIdentifier digAlgId = new DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId);
- AsymmetricKeyParameter privateKeyAsymKeyParam = PrivateKeyFactory.createKey(pair.getPrivate().getEncoded());
- SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(pair.getPublic().getEncoded());
- ContentSigner sigGen = new BcRSAContentSignerBuilder(sigAlgId, digAlgId).build(privateKeyAsymKeyParam);
- X500Name name = new X500Name(dn);
- Date from = new Date();
- Date to = new Date(from.getTime() + days * 86400000L);
- BigInteger sn = new BigInteger(64, new SecureRandom());
-
- X509v1CertificateBuilder v1CertGen = new X509v1CertificateBuilder(name, sn, from, to, name, subPubKeyInfo);
- X509CertificateHolder certificateHolder = v1CertGen.build(sigGen);
- return new JcaX509CertificateConverter().setProvider("BC").getCertificate(certificateHolder);
- } catch (CertificateException ce) {
- throw ce;
- } catch (Exception e) {
- throw new CertificateException(e);
- }
- }
-
- public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException {
- KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm);
- keyGen.initialize(1024);
- return keyGen.genKeyPair();
- }
-
- private static KeyStore createEmptyKeyStore() throws GeneralSecurityException, IOException {
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(null, null); // initialize
- return ks;
- }
-
- private static void saveKeyStore(KeyStore ks, String filename,
- String password) throws GeneralSecurityException, IOException {
- FileOutputStream out = new FileOutputStream(filename);
- try {
- ks.store(out, password.toCharArray());
- } finally {
- out.close();
- }
- }
-
- public static void createKeyStore(String filename,
- String password, String alias,
- Key privateKey, Certificate cert) throws GeneralSecurityException, IOException {
- KeyStore ks = createEmptyKeyStore();
- ks.setKeyEntry(alias, privateKey, password.toCharArray(),
- new Certificate[]{cert});
- saveKeyStore(ks, filename, password);
- }
-
- /**
- * Creates a keystore with a single key and saves it to a file.
- *
- * @param filename String file to save
- * @param password String store password to set on keystore
- * @param keyPassword String key password to set on key
- * @param alias String alias to use for the key
- * @param privateKey Key to save in keystore
- * @param cert Certificate to use as certificate chain associated to key
- * @throws GeneralSecurityException for any error with the security APIs
- * @throws IOException if there is an I/O error saving the file
- */
- public static void createKeyStore(String filename,
- String password, String keyPassword, String alias,
- Key privateKey, Certificate cert) throws GeneralSecurityException, IOException {
- KeyStore ks = createEmptyKeyStore();
- ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(),
- new Certificate[]{cert});
- saveKeyStore(ks, filename, password);
- }
-
- public static void createTrustStore(String filename,
- String password, String alias,
- Certificate cert) throws GeneralSecurityException, IOException {
- KeyStore ks = createEmptyKeyStore();
- ks.setCertificateEntry(alias, cert);
- saveKeyStore(ks, filename, password);
- }
-
- public static <T extends Certificate> void createTrustStore(
- String filename, String password, Map<String, T> certs) throws GeneralSecurityException, IOException {
- KeyStore ks = KeyStore.getInstance("JKS");
- try {
- FileInputStream in = new FileInputStream(filename);
- ks.load(in, password.toCharArray());
- in.close();
- } catch (EOFException e) {
- ks = createEmptyKeyStore();
- }
- for (Map.Entry<String, T> cert : certs.entrySet()) {
- ks.setCertificateEntry(cert.getKey(), cert.getValue());
- }
- saveKeyStore(ks, filename, password);
- }
-
- public static Map<String, X509Certificate> createX509Certificates(KeyPair keyPair)
- throws GeneralSecurityException {
- Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
- X509Certificate cert = generateCertificate("CN=localhost, O=localhost", keyPair, 30, "SHA1withRSA");
- certs.put("localhost", cert);
- return certs;
- }
-
- public static Map<String, Object> createSSLConfig(Mode mode, File keyStoreFile, String password, String keyPassword,
- File trustStoreFile, String trustStorePassword) {
- Map<String, Object> sslConfigs = new HashMap<String, Object>();
- sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security protocol
- sslConfigs.put(SSLConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
-
- if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) {
- sslConfigs.put(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath());
- sslConfigs.put(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
- sslConfigs.put(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
- sslConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
- sslConfigs.put(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
- }
-
- sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath());
- sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
- sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
- sslConfigs.put(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
-
- List<String> enabledProtocols = new ArrayList<String>();
- enabledProtocols.add("TLSv1.2");
- sslConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
-
- return sslConfigs;
- }
-
- public static Map<String, Object> createSSLConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias)
- throws IOException, GeneralSecurityException {
- Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
- File keyStoreFile;
- String password;
-
- if (mode == Mode.SERVER)
- password = "ServerPassword";
- else
- password = "ClientPassword";
-
- String trustStorePassword = "TrustStorePassword";
-
- if (useClientCert) {
- keyStoreFile = File.createTempFile("clientKS", ".jks");
- KeyPair cKP = generateKeyPair("RSA");
- X509Certificate cCert = generateCertificate("CN=localhost, O=client", cKP, 30, "SHA1withRSA");
- createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(), cCert);
- certs.put(certAlias, cCert);
- } else {
- keyStoreFile = File.createTempFile("serverKS", ".jks");
- KeyPair sKP = generateKeyPair("RSA");
- X509Certificate sCert = generateCertificate("CN=localhost, O=server", sKP, 30,
- "SHA1withRSA");
- createKeyStore(keyStoreFile.getPath(), password, password, "server", sKP.getPrivate(), sCert);
- certs.put(certAlias, sCert);
- }
-
- if (trustStore) {
- createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs);
- }
-
- Map<String, Object> sslConfig = createSSLConfig(mode, keyStoreFile, password,
- password, trustStoreFile, trustStorePassword);
- return sslConfig;
- }
-
-}