You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2015/10/24 18:45:05 UTC

[2/4] kafka git commit: KAFKA-2460; Fix capitalisation in SSL classes

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
deleted file mode 100644
index ebb59b5..0000000
--- a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java
+++ /dev/null
@@ -1,652 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.network;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.io.IOException;
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.config.SSLConfigs;
-import org.apache.kafka.common.security.ssl.SSLFactory;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestSSLUtils;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests for the SSL transport layer. These use a test harness that runs a simple socket server that echos back responses.
- */
-
-public class SSLTransportLayerTest {
-
-    private static final int BUFFER_SIZE = 4 * 1024;
-
-    private SSLEchoServer server;
-    private Selector selector;
-    private ChannelBuilder channelBuilder;
-    private CertStores serverCertStores;
-    private CertStores clientCertStores;
-    private Map<String, Object> sslClientConfigs;
-    private Map<String, Object> sslServerConfigs;
-
-    @Before
-    public void setup() throws Exception {
-        // Create certificates for use by client and server. Add server cert to client truststore and vice versa.
-        serverCertStores = new CertStores(true);
-        clientCertStores = new CertStores(false);
-        sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
-        sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
-
-        this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
-        this.channelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
-    }
-
-    @After
-    public void teardown() throws Exception {
-        if (selector != null)
-            this.selector.close();
-        if (server != null)
-            this.server.close();
-    }
-
-    /**
-     * Tests that server certificate with valid IP address is accepted by
-     * a client that validates server endpoint.
-     */
-    @Test
-    public void testValidEndpointIdentification() throws Exception {
-        String node = "0";
-        createEchoServer(sslServerConfigs);
-        sslClientConfigs.put(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        testClientConnection(node, 100, 10);
-    }
-    
-    /**
-     * Tests that server certificate with invalid IP address is not accepted by
-     * a client that validates server endpoint. Certificate uses "localhost" as
-     * common name, test uses host IP to trigger endpoint validation failure.
-     */
-    @Test
-    public void testInvalidEndpointIdentification() throws Exception {
-        String node = "0";
-        String serverHost = InetAddress.getLocalHost().getHostAddress();
-        server = new SSLEchoServer(sslServerConfigs, serverHost);
-        server.start();
-        sslClientConfigs.put(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress(serverHost, server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        waitForChannelClose(node);
-    }
-    
-    /**
-     * Tests that server certificate with invalid IP address is accepted by
-     * a client that has disabled endpoint validation
-     */
-    @Test
-    public void testEndpointIdentificationDisabled() throws Exception {
-        String node = "0";
-        String serverHost = InetAddress.getLocalHost().getHostAddress();
-        server = new SSLEchoServer(sslServerConfigs, serverHost);
-        server.start();
-        sslClientConfigs.remove(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress(serverHost, server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        testClientConnection(node, 100, 10);
-    }
-    
-    /**
-     * Tests that server accepts connections from clients with a trusted certificate
-     * when client authentication is required.
-     */
-    @Test
-    public void testClientAuthenticationRequiredValidProvided() throws Exception {
-        String node = "0";
-        sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
-        createEchoServer(sslServerConfigs);
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        testClientConnection(node, 100, 10);
-    }
-    
-    /**
-     * Tests that server does not accept connections from clients with an untrusted certificate
-     * when client authentication is required.
-     */
-    @Test
-    public void testClientAuthenticationRequiredUntrustedProvided() throws Exception {
-        String node = "0";
-        sslServerConfigs = serverCertStores.getUntrustingConfig();
-        sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
-        createEchoServer(sslServerConfigs);        
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        waitForChannelClose(node);
-    }
-    
-    /**
-     * Tests that server does not accept connections from clients which dont
-     * provide a certificate when client authentication is required.
-     */
-    @Test
-    public void testClientAuthenticationRequiredNotProvided() throws Exception {
-        String node = "0";
-        sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
-        createEchoServer(sslServerConfigs);
-        
-        sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
-        sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
-        sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG);
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        waitForChannelClose(node);
-    }
-    
-    /**
-     * Tests that server accepts connections from a client configured
-     * with an untrusted certificate if client authentication is disabled
-     */
-    @Test
-    public void testClientAuthenticationDisabledUntrustedProvided() throws Exception {
-        String node = "0";
-        sslServerConfigs = serverCertStores.getUntrustingConfig();
-        sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
-        createEchoServer(sslServerConfigs);      
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        testClientConnection(node, 100, 10);
-    }
-    
-    /**
-     * Tests that server accepts connections from a client that does not provide
-     * a certificate if client authentication is disabled
-     */
-    @Test
-    public void testClientAuthenticationDisabledNotProvided() throws Exception {
-        String node = "0";
-        sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
-        createEchoServer(sslServerConfigs);
-        
-        sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
-        sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
-        sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG);
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        testClientConnection(node, 100, 10);
-    }
-    
-    /**
-     * Tests that server accepts connections from a client configured
-     * with a valid certificate if client authentication is requested
-     */
-    @Test
-    public void testClientAuthenticationRequestedValidProvided() throws Exception {
-        String node = "0";
-        sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
-        createEchoServer(sslServerConfigs);
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        testClientConnection(node, 100, 10);
-    }
-    
-    /**
-     * Tests that server accepts connections from a client that does not provide
-     * a certificate if client authentication is requested but not required
-     */
-    @Test
-    public void testClientAuthenticationRequestedNotProvided() throws Exception {
-        String node = "0";
-        sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
-        createEchoServer(sslServerConfigs);
-        
-        sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
-        sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
-        sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG);
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        testClientConnection(node, 100, 10);
-    }
-    
-    /**
-     * Tests that channels cannot be created if truststore cannot be loaded
-     */
-    @Test
-    public void testInvalidTruststorePassword() throws Exception {
-        SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
-        try {
-            sslClientConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid");
-            channelBuilder.configure(sslClientConfigs);
-            fail("SSL channel configured with invalid truststore password");
-        } catch (KafkaException e) {
-            // Expected exception
-        }
-    }
-    
-    /**
-     * Tests that channels cannot be created if keystore cannot be loaded
-     */
-    @Test
-    public void testInvalidKeystorePassword() throws Exception {
-        SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
-        try {
-            sslClientConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid");
-            channelBuilder.configure(sslClientConfigs);
-            fail("SSL channel configured with invalid keystore password");
-        } catch (KafkaException e) {
-            // Expected exception
-        }
-    }
-    
-    /**
-     * Tests that client connections cannot be created to a server
-     * if key password is invalid
-     */
-    @Test
-    public void testInvalidKeyPassword() throws Exception {
-        String node = "0";
-        sslServerConfigs.put(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, "invalid");
-        createEchoServer(sslServerConfigs);        
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        waitForChannelClose(node);
-    }
-    
-    /**
-     * Tests that connections cannot be made with unsupported TLS versions
-     */
-    @Test
-    public void testUnsupportedTLSVersion() throws Exception {
-        String node = "0";
-        sslServerConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2"));
-        createEchoServer(sslServerConfigs);
-        
-        sslClientConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1"));
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        waitForChannelClose(node);
-    }
-    
-    /**
-     * Tests that connections cannot be made with unsupported TLS cipher suites
-     */
-    @Test
-    public void testUnsupportedCiphers() throws Exception {
-        String node = "0";
-        String[] cipherSuites = SSLContext.getDefault().getDefaultSSLParameters().getCipherSuites();
-        sslServerConfigs.put(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0]));
-        createEchoServer(sslServerConfigs);
-        
-        sslClientConfigs.put(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1]));
-        createSelector(sslClientConfigs);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        waitForChannelClose(node);
-    }
-
-    /**
-     * Tests handling of BUFFER_UNDERFLOW during unwrap when network read buffer is smaller than SSL session packet buffer size.
-     */
-    @Test
-    public void testNetReadBufferResize() throws Exception {
-        String node = "0";
-        createEchoServer(sslServerConfigs);
-        createSelector(sslClientConfigs, 10, null, null);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        testClientConnection(node, 64000, 10);
-    }
-    
-    /**
-     * Tests handling of BUFFER_OVERFLOW during wrap when network write buffer is smaller than SSL session packet buffer size.
-     */
-    @Test
-    public void testNetWriteBufferResize() throws Exception {
-        String node = "0";
-        createEchoServer(sslServerConfigs);
-        createSelector(sslClientConfigs, null, 10, null);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        testClientConnection(node, 64000, 10);
-    }
-
-    /**
-     * Tests handling of BUFFER_OVERFLOW during unwrap when application read buffer is smaller than SSL session application buffer size.
-     */
-    @Test
-    public void testApplicationBufferResize() throws Exception {
-        String node = "0";
-        createEchoServer(sslServerConfigs);
-        createSelector(sslClientConfigs, null, null, 10);
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        testClientConnection(node, 64000, 10);
-    }
-
-    private void testClientConnection(String node, int minMessageSize, int messageCount) throws Exception {
-
-        String prefix = TestUtils.randomString(minMessageSize);
-        int requests = 0;
-        int responses = 0;
-        // wait for handshake to finish
-        while (!selector.isChannelReady(node)) {
-            selector.poll(1000L);
-        }
-        selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-0").getBytes())));
-        requests++;
-        while (responses < messageCount) {
-            selector.poll(0L);
-            assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
-
-            for (NetworkReceive receive : selector.completedReceives()) {
-                assertEquals(prefix + "-" + responses, new String(Utils.toArray(receive.payload())));
-                responses++;
-            }
-
-            for (int i = 0; i < selector.completedSends().size() && requests < messageCount && selector.isChannelReady(node); i++, requests++) {
-                selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-" + requests).getBytes())));
-            }
-        }
-    }
-    
-    private void waitForChannelClose(String node) throws IOException {
-        boolean closed = false;
-        for (int i = 0; i < 30; i++) {
-            selector.poll(1000L);
-            if (selector.channel(node) == null) {
-                closed = true;
-                break;
-            }
-        }
-        assertTrue(closed);
-    }
-    
-    private void createEchoServer(Map<String, Object> sslServerConfigs) throws Exception {
-        server = new SSLEchoServer(sslServerConfigs, "localhost");
-        server.start();
-    }
-    
-    private void createSelector(Map<String, Object> sslClientConfigs) {
-        createSelector(sslClientConfigs, null, null, null);
-    }      
-
-    private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize, final Integer netWriteBufSize, final Integer appBufSize) {
-        
-        this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT) {
-
-            @Override
-            protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException {
-                SocketChannel socketChannel = (SocketChannel) key.channel();
-                SSLEngine sslEngine = sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
-                                socketChannel.socket().getPort());
-                TestSSLTransportLayer transportLayer = new TestSSLTransportLayer(id, key, sslEngine, netReadBufSize, netWriteBufSize, appBufSize);
-                transportLayer.startHandshake();
-                return transportLayer;
-            }
-
-
-        };
-        this.channelBuilder.configure(sslClientConfigs);
-        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
-    }
-    
-    private static class CertStores {
-        
-        Map<String, Object> sslConfig;
-        
-        CertStores(boolean server) throws Exception {
-            String name = server ? "server" : "client";
-            Mode mode = server ? Mode.SERVER : Mode.CLIENT;
-            File truststoreFile = File.createTempFile(name + "TS", ".jks");
-            sslConfig = TestSSLUtils.createSSLConfig(!server, true, mode, truststoreFile, name);
-            sslConfig.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
-        }
-       
-        private Map<String, Object> getTrustingConfig(CertStores truststoreConfig) {
-            Map<String, Object> config = new HashMap<String, Object>(sslConfig);
-            config.put(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
-            config.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
-            config.put(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
-            return config;
-        }
-        
-        private Map<String, Object> getUntrustingConfig() {
-            return sslConfig;
-        }
-    }
-
-    /**
-     * SSLTransportLayer with overrides for packet and application buffer size to test buffer resize
-     * code path. The overridden buffer size starts with a small value and increases in size when the buffer
-     * size is retrieved to handle overflow/underflow, until the actual session buffer size is reached.
-     */
-    private static class TestSSLTransportLayer extends SSLTransportLayer {
-
-        private final ResizeableBufferSize netReadBufSize;
-        private final ResizeableBufferSize netWriteBufSize;
-        private final ResizeableBufferSize appBufSize;
-
-        public TestSSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, 
-                Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) throws IOException {
-            super(channelId, key, sslEngine, false);
-            this.netReadBufSize = new ResizeableBufferSize(netReadBufSize);
-            this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSize);
-            this.appBufSize = new ResizeableBufferSize(appBufSize);
-        }
-        
-        @Override
-        protected int netReadBufferSize() {
-            ByteBuffer netReadBuffer = netReadBuffer();
-            // netReadBufferSize() is invoked in SSLTransportLayer.read() prior to the read
-            // operation. To avoid the read buffer being expanded too early, increase buffer size
-            // only when read buffer is full. This ensures that BUFFER_UNDERFLOW is always
-            // triggered in testNetReadBufferResize().
-            boolean updateBufSize = netReadBuffer != null && !netReadBuffer().hasRemaining();
-            return netReadBufSize.updateAndGet(super.netReadBufferSize(), updateBufSize);
-        }
-        
-        @Override
-        protected int netWriteBufferSize() {
-            return netWriteBufSize.updateAndGet(super.netWriteBufferSize(), true);
-        }
-
-        @Override
-        protected int applicationBufferSize() {
-            return appBufSize.updateAndGet(super.applicationBufferSize(), true);
-        }
-        
-        private static class ResizeableBufferSize {
-            private Integer bufSizeOverride;
-            ResizeableBufferSize(Integer bufSizeOverride) {
-                this.bufSizeOverride = bufSizeOverride;
-            }
-            int updateAndGet(int actualSize, boolean update) {
-                int size = actualSize;
-                if (bufSizeOverride != null) {
-                    if (update)
-                        bufSizeOverride = Math.min(bufSizeOverride * 2, size);
-                    size = bufSizeOverride;
-                }
-                return size;
-            }
-        }
-    }
-    
-    // Non-blocking EchoServer implementation that uses SSLTransportLayer
-    private class SSLEchoServer extends Thread {
-        private final int port;
-        private final ServerSocketChannel serverSocketChannel;
-        private final List<SocketChannel> newChannels;
-        private final List<SocketChannel> socketChannels;
-        private final AcceptorThread acceptorThread;
-        private SSLFactory sslFactory;
-        private final Selector selector;
-        private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>();
-
-        public SSLEchoServer(Map<String, ?> configs, String serverHost) throws Exception {
-            this.sslFactory = new SSLFactory(Mode.SERVER);
-            this.sslFactory.configure(configs);
-            serverSocketChannel = ServerSocketChannel.open();
-            serverSocketChannel.configureBlocking(false);
-            serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
-            this.port = serverSocketChannel.socket().getLocalPort();
-            this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
-            this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
-            SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.SERVER);
-            channelBuilder.configure(sslServerConfigs);
-            this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
-            setName("echoserver");
-            setDaemon(true);
-            acceptorThread = new AcceptorThread();
-        }
-
-        @Override
-        public void run() {
-            try {
-                acceptorThread.start();
-                while (serverSocketChannel.isOpen()) {
-                    selector.poll(1000);
-                    for (SocketChannel socketChannel : newChannels) {
-                        String id = id(socketChannel);
-                        selector.register(id, socketChannel);
-                        socketChannels.add(socketChannel);
-                    }
-                    newChannels.clear();
-                    while (true) {
-                        NetworkSend send = inflightSends.peek();
-                        if (send != null && !selector.channel(send.destination()).hasSend()) {
-                            send = inflightSends.poll();
-                            selector.send(send);
-                        } else
-                            break;
-                    }
-                    List<NetworkReceive> completedReceives = selector.completedReceives();
-                    for (NetworkReceive rcv : completedReceives) {
-                        NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
-                        if (!selector.channel(send.destination()).hasSend())
-                            selector.send(send);
-                        else
-                            inflightSends.add(send);
-                    }
-                }
-            } catch (IOException e) {
-                // ignore
-            }
-        }
-        
-        private String id(SocketChannel channel) {
-            return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort() + "-" +
-                    channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();
-        }
-
-        public void closeConnections() throws IOException {
-            for (SocketChannel channel : socketChannels)
-                channel.close();
-            socketChannels.clear();
-        }
-
-        public void close() throws IOException, InterruptedException {
-            this.serverSocketChannel.close();
-            closeConnections();
-            acceptorThread.interrupt();
-            acceptorThread.join();
-            interrupt();
-            join();
-        }
-        
-        private class AcceptorThread extends Thread {
-            public AcceptorThread() throws IOException {
-                setName("acceptor");
-            }
-            public void run() {
-                try {
-
-                    java.nio.channels.Selector acceptSelector = java.nio.channels.Selector.open();
-                    serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
-                    while (serverSocketChannel.isOpen()) {
-                        if (acceptSelector.select(1000) > 0) {
-                            Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator();
-                            while (it.hasNext()) {
-                                SelectionKey key = it.next();
-                                if (key.isAcceptable()) {
-                                    SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
-                                    socketChannel.configureBlocking(false);
-                                    newChannels.add(socketChannel);
-                                    selector.wakeup();
-                                }
-                            }
-                        }
-                    }
-                } catch (IOException e) {
-                    // ignore
-                }
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 683eeee..8ce0298 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -49,7 +49,7 @@ public class SelectorTest {
     @Before
     public void setup() throws Exception {
         Map<String, Object> configs = new HashMap<String, Object>();
-        configs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+        configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
         this.server = new EchoServer(configs);
         this.server.start();
         this.time = new MockTime();

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
new file mode 100644
index 0000000..94c5654
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestSslUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses.
+ */
+public class SslSelectorTest extends SelectorTest {
+
+    private Metrics metrics;
+    private Map<String, Object> sslClientConfigs;
+
+    @Before
+    public void setup() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+
+        Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
+        sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+        this.server = new EchoServer(sslServerConfigs);
+        this.server.start();
+        this.time = new MockTime();
+        sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.SERVER, trustStoreFile, "client");
+        sslClientConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+
+        this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+        this.channelBuilder.configure(sslClientConfigs);
+        this.metrics = new Metrics();
+        this.selector = new Selector(5000, metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        this.selector.close();
+        this.server.close();
+        this.metrics.close();
+    }
+
+    /**
+     * Tests that SSL renegotiation initiated by the server are handled correctly by the client
+     * @throws Exception
+     */
+    @Test
+    public void testRenegotiation() throws Exception {
+        ChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT) {
+            @Override
+            protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException {
+                SocketChannel socketChannel = (SocketChannel) key.channel();
+                SslTransportLayer transportLayer = new SslTransportLayer(id, key,
+                    sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()),
+                    true);
+                transportLayer.startHandshake();
+                return transportLayer;
+            }
+        };
+        channelBuilder.configure(sslClientConfigs);
+        Selector selector = new Selector(5000, metrics, time, "MetricGroup2", new LinkedHashMap<String, String>(), channelBuilder);
+        try {
+            int reqs = 500;
+            String node = "0";
+            // create connections
+            InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+            selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+            // send echo requests and receive responses
+            int requests = 0;
+            int responses = 0;
+            int renegotiates = 0;
+            while (!selector.isChannelReady(node)) {
+                selector.poll(1000L);
+            }
+            selector.send(createSend(node, node + "-" + 0));
+            requests++;
+
+            // loop until we complete all requests
+            while (responses < reqs) {
+                selector.poll(0L);
+                if (responses >= 100 && renegotiates == 0) {
+                    renegotiates++;
+                    server.renegotiate();
+                }
+                assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
+
+                // handle any responses we may have gotten
+                for (NetworkReceive receive : selector.completedReceives()) {
+                    String[] pieces = asString(receive).split("-");
+                    assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
+                    assertEquals("Check the source", receive.source(), pieces[0]);
+                    assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
+                    assertEquals("Check the request counter", responses, Integer.parseInt(pieces[1]));
+                    responses++;
+                }
+
+                // prepare new sends for the next round
+                for (int i = 0; i < selector.completedSends().size() && requests < reqs && selector.isChannelReady(node); i++, requests++) {
+                    selector.send(createSend(node, node + "-" + requests));
+                }
+            }
+        } finally {
+            selector.close();
+        }
+    }
+
+    @Test
+    public void testDisabledRenegotiation() throws Exception {
+        String node = "0";
+        // create connections
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        // send echo requests and receive responses
+        while (!selector.isChannelReady(node)) {
+            selector.poll(1000L);
+        }
+        selector.send(createSend(node, node + "-" + 0));
+        selector.poll(0L);
+        server.renegotiate();
+        selector.send(createSend(node, node + "-" + 1));
+        long expiryTime = System.currentTimeMillis() + 2000;
+
+        List<String> disconnected = new ArrayList<>();
+        while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) {
+            selector.poll(10);
+            disconnected.addAll(selector.disconnected());
+        }
+        assertTrue("Renegotiation should cause disconnection", disconnected.contains(node));
+
+    }
+
+    /**
+     * Connects and waits for handshake to complete. This is required since SSLTransportLayer
+     * implementation requires the channel to be ready before send is invoked (unlike plaintext
+     * where send can be invoked straight after connect)
+     */
+    protected void connect(String node, InetSocketAddress serverAddr) throws IOException {
+        blockingConnect(node, serverAddr);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
new file mode 100644
index 0000000..91bd47c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -0,0 +1,651 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.io.IOException;
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for the SSL transport layer. These use a test harness that runs a simple socket server that echos back responses.
+ */
+public class SslTransportLayerTest {
+
+    private static final int BUFFER_SIZE = 4 * 1024;
+
+    private SslEchoServer server;
+    private Selector selector;
+    private ChannelBuilder channelBuilder;
+    private CertStores serverCertStores;
+    private CertStores clientCertStores;
+    private Map<String, Object> sslClientConfigs;
+    private Map<String, Object> sslServerConfigs;
+
+    @Before
+    public void setup() throws Exception {
+        // Create certificates for use by client and server. Add server cert to client truststore and vice versa.
+        serverCertStores = new CertStores(true);
+        clientCertStores = new CertStores(false);
+        sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
+        sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
+
+        this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+        this.channelBuilder.configure(sslClientConfigs);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (selector != null)
+            this.selector.close();
+        if (server != null)
+            this.server.close();
+    }
+
+    /**
+     * Tests that server certificate with valid IP address is accepted by
+     * a client that validates server endpoint.
+     */
+    @Test
+    public void testValidEndpointIdentification() throws Exception {
+        String node = "0";
+        createEchoServer(sslServerConfigs);
+        sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        testClientConnection(node, 100, 10);
+    }
+    
+    /**
+     * Tests that server certificate with invalid IP address is not accepted by
+     * a client that validates server endpoint. Certificate uses "localhost" as
+     * common name, test uses host IP to trigger endpoint validation failure.
+     */
+    @Test
+    public void testInvalidEndpointIdentification() throws Exception {
+        String node = "0";
+        String serverHost = InetAddress.getLocalHost().getHostAddress();
+        server = new SslEchoServer(sslServerConfigs, serverHost);
+        server.start();
+        sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress(serverHost, server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        waitForChannelClose(node);
+    }
+    
+    /**
+     * Tests that server certificate with invalid IP address is accepted by
+     * a client that has disabled endpoint validation
+     */
+    @Test
+    public void testEndpointIdentificationDisabled() throws Exception {
+        String node = "0";
+        String serverHost = InetAddress.getLocalHost().getHostAddress();
+        server = new SslEchoServer(sslServerConfigs, serverHost);
+        server.start();
+        sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress(serverHost, server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        testClientConnection(node, 100, 10);
+    }
+    
+    /**
+     * Tests that server accepts connections from clients with a trusted certificate
+     * when client authentication is required.
+     */
+    @Test
+    public void testClientAuthenticationRequiredValidProvided() throws Exception {
+        String node = "0";
+        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+        createEchoServer(sslServerConfigs);
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        testClientConnection(node, 100, 10);
+    }
+    
+    /**
+     * Tests that server does not accept connections from clients with an untrusted certificate
+     * when client authentication is required.
+     */
+    @Test
+    public void testClientAuthenticationRequiredUntrustedProvided() throws Exception {
+        String node = "0";
+        sslServerConfigs = serverCertStores.getUntrustingConfig();
+        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+        createEchoServer(sslServerConfigs);        
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        waitForChannelClose(node);
+    }
+    
+    /**
+     * Tests that server does not accept connections from clients which dont
+     * provide a certificate when client authentication is required.
+     */
+    @Test
+    public void testClientAuthenticationRequiredNotProvided() throws Exception {
+        String node = "0";
+        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+        createEchoServer(sslServerConfigs);
+        
+        sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
+        sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
+        sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        waitForChannelClose(node);
+    }
+    
+    /**
+     * Tests that server accepts connections from a client configured
+     * with an untrusted certificate if client authentication is disabled
+     */
+    @Test
+    public void testClientAuthenticationDisabledUntrustedProvided() throws Exception {
+        String node = "0";
+        sslServerConfigs = serverCertStores.getUntrustingConfig();
+        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
+        createEchoServer(sslServerConfigs);      
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        testClientConnection(node, 100, 10);
+    }
+    
+    /**
+     * Tests that server accepts connections from a client that does not provide
+     * a certificate if client authentication is disabled
+     */
+    @Test
+    public void testClientAuthenticationDisabledNotProvided() throws Exception {
+        String node = "0";
+        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
+        createEchoServer(sslServerConfigs);
+        
+        sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
+        sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
+        sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        testClientConnection(node, 100, 10);
+    }
+    
+    /**
+     * Tests that server accepts connections from a client configured
+     * with a valid certificate if client authentication is requested
+     */
+    @Test
+    public void testClientAuthenticationRequestedValidProvided() throws Exception {
+        String node = "0";
+        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
+        createEchoServer(sslServerConfigs);
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        testClientConnection(node, 100, 10);
+    }
+    
+    /**
+     * Tests that server accepts connections from a client that does not provide
+     * a certificate if client authentication is requested but not required
+     */
+    @Test
+    public void testClientAuthenticationRequestedNotProvided() throws Exception {
+        String node = "0";
+        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
+        createEchoServer(sslServerConfigs);
+        
+        sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
+        sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
+        sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        testClientConnection(node, 100, 10);
+    }
+    
+    /**
+     * Tests that channels cannot be created if truststore cannot be loaded
+     */
+    @Test
+    public void testInvalidTruststorePassword() throws Exception {
+        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+        try {
+            sslClientConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid");
+            channelBuilder.configure(sslClientConfigs);
+            fail("SSL channel configured with invalid truststore password");
+        } catch (KafkaException e) {
+            // Expected exception
+        }
+    }
+    
+    /**
+     * Tests that channels cannot be created if keystore cannot be loaded
+     */
+    @Test
+    public void testInvalidKeystorePassword() throws Exception {
+        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+        try {
+            sslClientConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid");
+            channelBuilder.configure(sslClientConfigs);
+            fail("SSL channel configured with invalid keystore password");
+        } catch (KafkaException e) {
+            // Expected exception
+        }
+    }
+    
+    /**
+     * Tests that client connections cannot be created to a server
+     * if key password is invalid
+     */
+    @Test
+    public void testInvalidKeyPassword() throws Exception {
+        String node = "0";
+        sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "invalid");
+        createEchoServer(sslServerConfigs);        
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        waitForChannelClose(node);
+    }
+    
+    /**
+     * Tests that connections cannot be made with unsupported TLS versions
+     */
+    @Test
+    public void testUnsupportedTLSVersion() throws Exception {
+        String node = "0";
+        sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2"));
+        createEchoServer(sslServerConfigs);
+        
+        sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1"));
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        waitForChannelClose(node);
+    }
+    
+    /**
+     * Tests that connections cannot be made with unsupported TLS cipher suites
+     */
+    @Test
+    public void testUnsupportedCiphers() throws Exception {
+        String node = "0";
+        String[] cipherSuites = SSLContext.getDefault().getDefaultSSLParameters().getCipherSuites();
+        sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0]));
+        createEchoServer(sslServerConfigs);
+        
+        sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1]));
+        createSelector(sslClientConfigs);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        waitForChannelClose(node);
+    }
+
+    /**
+     * Tests handling of BUFFER_UNDERFLOW during unwrap when network read buffer is smaller than SSL session packet buffer size.
+     */
+    @Test
+    public void testNetReadBufferResize() throws Exception {
+        String node = "0";
+        createEchoServer(sslServerConfigs);
+        createSelector(sslClientConfigs, 10, null, null);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        testClientConnection(node, 64000, 10);
+    }
+    
+    /**
+     * Tests handling of BUFFER_OVERFLOW during wrap when network write buffer is smaller than SSL session packet buffer size.
+     */
+    @Test
+    public void testNetWriteBufferResize() throws Exception {
+        String node = "0";
+        createEchoServer(sslServerConfigs);
+        createSelector(sslClientConfigs, null, 10, null);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        testClientConnection(node, 64000, 10);
+    }
+
+    /**
+     * Tests handling of BUFFER_OVERFLOW during unwrap when application read buffer is smaller than SSL session application buffer size.
+     */
+    @Test
+    public void testApplicationBufferResize() throws Exception {
+        String node = "0";
+        createEchoServer(sslServerConfigs);
+        createSelector(sslClientConfigs, null, null, 10);
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        testClientConnection(node, 64000, 10);
+    }
+
+    private void testClientConnection(String node, int minMessageSize, int messageCount) throws Exception {
+
+        String prefix = TestUtils.randomString(minMessageSize);
+        int requests = 0;
+        int responses = 0;
+        // wait for handshake to finish
+        while (!selector.isChannelReady(node)) {
+            selector.poll(1000L);
+        }
+        selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-0").getBytes())));
+        requests++;
+        while (responses < messageCount) {
+            selector.poll(0L);
+            assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
+
+            for (NetworkReceive receive : selector.completedReceives()) {
+                assertEquals(prefix + "-" + responses, new String(Utils.toArray(receive.payload())));
+                responses++;
+            }
+
+            for (int i = 0; i < selector.completedSends().size() && requests < messageCount && selector.isChannelReady(node); i++, requests++) {
+                selector.send(new NetworkSend(node, ByteBuffer.wrap((prefix + "-" + requests).getBytes())));
+            }
+        }
+    }
+    
+    private void waitForChannelClose(String node) throws IOException {
+        boolean closed = false;
+        for (int i = 0; i < 30; i++) {
+            selector.poll(1000L);
+            if (selector.channel(node) == null) {
+                closed = true;
+                break;
+            }
+        }
+        assertTrue(closed);
+    }
+    
+    private void createEchoServer(Map<String, Object> sslServerConfigs) throws Exception {
+        server = new SslEchoServer(sslServerConfigs, "localhost");
+        server.start();
+    }
+    
+    private void createSelector(Map<String, Object> sslClientConfigs) {
+        createSelector(sslClientConfigs, null, null, null);
+    }      
+
+    private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize, final Integer netWriteBufSize, final Integer appBufSize) {
+        
+        this.channelBuilder = new SslChannelBuilder(Mode.CLIENT) {
+
+            @Override
+            protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException {
+                SocketChannel socketChannel = (SocketChannel) key.channel();
+                SSLEngine sslEngine = sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
+                                socketChannel.socket().getPort());
+                TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine, netReadBufSize, netWriteBufSize, appBufSize);
+                transportLayer.startHandshake();
+                return transportLayer;
+            }
+
+
+        };
+        this.channelBuilder.configure(sslClientConfigs);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+    }
+    
+    private static class CertStores {
+        
+        Map<String, Object> sslConfig;
+        
+        CertStores(boolean server) throws Exception {
+            String name = server ? "server" : "client";
+            Mode mode = server ? Mode.SERVER : Mode.CLIENT;
+            File truststoreFile = File.createTempFile(name + "TS", ".jks");
+            sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name);
+            sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+        }
+       
+        private Map<String, Object> getTrustingConfig(CertStores truststoreConfig) {
+            Map<String, Object> config = new HashMap<String, Object>(sslConfig);
+            config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+            config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+            config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
+            return config;
+        }
+        
+        private Map<String, Object> getUntrustingConfig() {
+            return sslConfig;
+        }
+    }
+
+    /**
+     * SSLTransportLayer with overrides for packet and application buffer size to test buffer resize
+     * code path. The overridden buffer size starts with a small value and increases in size when the buffer
+     * size is retrieved to handle overflow/underflow, until the actual session buffer size is reached.
+     */
+    private static class TestSslTransportLayer extends SslTransportLayer {
+
+        private final ResizeableBufferSize netReadBufSize;
+        private final ResizeableBufferSize netWriteBufSize;
+        private final ResizeableBufferSize appBufSize;
+
+        public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine,
+                                     Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) throws IOException {
+            super(channelId, key, sslEngine, false);
+            this.netReadBufSize = new ResizeableBufferSize(netReadBufSize);
+            this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSize);
+            this.appBufSize = new ResizeableBufferSize(appBufSize);
+        }
+        
+        @Override
+        protected int netReadBufferSize() {
+            ByteBuffer netReadBuffer = netReadBuffer();
+            // netReadBufferSize() is invoked in SSLTransportLayer.read() prior to the read
+            // operation. To avoid the read buffer being expanded too early, increase buffer size
+            // only when read buffer is full. This ensures that BUFFER_UNDERFLOW is always
+            // triggered in testNetReadBufferResize().
+            boolean updateBufSize = netReadBuffer != null && !netReadBuffer().hasRemaining();
+            return netReadBufSize.updateAndGet(super.netReadBufferSize(), updateBufSize);
+        }
+        
+        @Override
+        protected int netWriteBufferSize() {
+            return netWriteBufSize.updateAndGet(super.netWriteBufferSize(), true);
+        }
+
+        @Override
+        protected int applicationBufferSize() {
+            return appBufSize.updateAndGet(super.applicationBufferSize(), true);
+        }
+        
+        private static class ResizeableBufferSize {
+            private Integer bufSizeOverride;
+            ResizeableBufferSize(Integer bufSizeOverride) {
+                this.bufSizeOverride = bufSizeOverride;
+            }
+            int updateAndGet(int actualSize, boolean update) {
+                int size = actualSize;
+                if (bufSizeOverride != null) {
+                    if (update)
+                        bufSizeOverride = Math.min(bufSizeOverride * 2, size);
+                    size = bufSizeOverride;
+                }
+                return size;
+            }
+        }
+    }
+    
+    // Non-blocking EchoServer implementation that uses SSLTransportLayer
+    private class SslEchoServer extends Thread {
+        private final int port;
+        private final ServerSocketChannel serverSocketChannel;
+        private final List<SocketChannel> newChannels;
+        private final List<SocketChannel> socketChannels;
+        private final AcceptorThread acceptorThread;
+        private SslFactory sslFactory;
+        private final Selector selector;
+        private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>();
+
+        public SslEchoServer(Map<String, ?> configs, String serverHost) throws Exception {
+            this.sslFactory = new SslFactory(Mode.SERVER);
+            this.sslFactory.configure(configs);
+            serverSocketChannel = ServerSocketChannel.open();
+            serverSocketChannel.configureBlocking(false);
+            serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
+            this.port = serverSocketChannel.socket().getLocalPort();
+            this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
+            this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
+            SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.SERVER);
+            channelBuilder.configure(sslServerConfigs);
+            this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+            setName("echoserver");
+            setDaemon(true);
+            acceptorThread = new AcceptorThread();
+        }
+
+        @Override
+        public void run() {
+            try {
+                acceptorThread.start();
+                while (serverSocketChannel.isOpen()) {
+                    selector.poll(1000);
+                    for (SocketChannel socketChannel : newChannels) {
+                        String id = id(socketChannel);
+                        selector.register(id, socketChannel);
+                        socketChannels.add(socketChannel);
+                    }
+                    newChannels.clear();
+                    while (true) {
+                        NetworkSend send = inflightSends.peek();
+                        if (send != null && !selector.channel(send.destination()).hasSend()) {
+                            send = inflightSends.poll();
+                            selector.send(send);
+                        } else
+                            break;
+                    }
+                    List<NetworkReceive> completedReceives = selector.completedReceives();
+                    for (NetworkReceive rcv : completedReceives) {
+                        NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
+                        if (!selector.channel(send.destination()).hasSend())
+                            selector.send(send);
+                        else
+                            inflightSends.add(send);
+                    }
+                }
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+        
+        private String id(SocketChannel channel) {
+            return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort() + "-" +
+                    channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();
+        }
+
+        public void closeConnections() throws IOException {
+            for (SocketChannel channel : socketChannels)
+                channel.close();
+            socketChannels.clear();
+        }
+
+        public void close() throws IOException, InterruptedException {
+            this.serverSocketChannel.close();
+            closeConnections();
+            acceptorThread.interrupt();
+            acceptorThread.join();
+            interrupt();
+            join();
+        }
+        
+        private class AcceptorThread extends Thread {
+            public AcceptorThread() throws IOException {
+                setName("acceptor");
+            }
+            public void run() {
+                try {
+
+                    java.nio.channels.Selector acceptSelector = java.nio.channels.Selector.open();
+                    serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
+                    while (serverSocketChannel.isOpen()) {
+                        if (acceptSelector.select(1000) > 0) {
+                            Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator();
+                            while (it.hasNext()) {
+                                SelectionKey key = it.next();
+                                if (key.isAcceptable()) {
+                                    SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
+                                    socketChannel.configureBlocking(false);
+                                    newChannels.add(socketChannel);
+                                    selector.wakeup();
+                                }
+                            }
+                        }
+                    }
+                } catch (IOException e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
deleted file mode 100644
index e90ec2b..0000000
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.security.ssl;
-
-import javax.net.ssl.*;
-
-import java.io.File;
-import java.util.Map;
-
-import org.apache.kafka.test.TestSSLUtils;
-import org.apache.kafka.common.network.Mode;
-
-import org.junit.Test;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
- */
-
-public class SSLFactoryTest {
-
-    @Test
-    public void testSSLFactoryConfiguration() throws Exception {
-        File trustStoreFile = File.createTempFile("truststore", ".jks");
-        Map<String, Object> serverSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.SERVER, trustStoreFile, "server");
-        SSLFactory sslFactory = new SSLFactory(Mode.SERVER);
-        sslFactory.configure(serverSSLConfig);
-        //host and port are hints
-        SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);
-        assertNotNull(engine);
-        String[] expectedProtocols = {"TLSv1.2"};
-        assertArrayEquals(expectedProtocols, engine.getEnabledProtocols());
-        assertEquals(false, engine.getUseClientMode());
-    }
-
-    @Test
-    public void testClientMode() throws Exception {
-        File trustStoreFile = File.createTempFile("truststore", ".jks");
-        Map<String, Object> clientSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.CLIENT, trustStoreFile, "client");
-        SSLFactory sslFactory = new SSLFactory(Mode.CLIENT);
-        sslFactory.configure(clientSSLConfig);
-        //host and port are hints
-        SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);
-        assertTrue(engine.getUseClientMode());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
new file mode 100644
index 0000000..b5710aa
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import javax.net.ssl.*;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.common.network.Mode;
+
+import org.junit.Test;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
+ */
+
+public class SslFactoryTest {
+
+    @Test
+    public void testSslFactoryConfiguration() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
+        SslFactory sslFactory = new SslFactory(Mode.SERVER);
+        sslFactory.configure(serverSslConfig);
+        //host and port are hints
+        SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
+        assertNotNull(engine);
+        String[] expectedProtocols = {"TLSv1.2"};
+        assertArrayEquals(expectedProtocols, engine.getEnabledProtocols());
+        assertEquals(false, engine.getUseClientMode());
+    }
+
+    @Test
+    public void testClientMode() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> clientSslConfig = TestSslUtils.createSslConfig(false, true, Mode.CLIENT, trustStoreFile, "client");
+        SslFactory sslFactory = new SslFactory(Mode.CLIENT);
+        sslFactory.configure(clientSslConfig);
+        //host and port are hints
+        SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
+        assertTrue(engine.getUseClientMode());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
deleted file mode 100644
index b231692..0000000
--- a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.config.SSLConfigs;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.clients.CommonClientConfigs;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.EOFException;
-import java.math.BigInteger;
-import javax.net.ssl.TrustManagerFactory;
-import java.security.*;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-
-import org.bouncycastle.asn1.x500.X500Name;
-import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
-import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
-import org.bouncycastle.cert.X509CertificateHolder;
-import org.bouncycastle.cert.X509v1CertificateBuilder;
-import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
-import org.bouncycastle.crypto.params.AsymmetricKeyParameter;
-import org.bouncycastle.crypto.util.PrivateKeyFactory;
-import org.bouncycastle.jce.provider.BouncyCastleProvider;
-import org.bouncycastle.operator.ContentSigner;
-import org.bouncycastle.operator.DefaultDigestAlgorithmIdentifierFinder;
-import org.bouncycastle.operator.DefaultSignatureAlgorithmIdentifierFinder;
-import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.ArrayList;
-
-
-public class TestSSLUtils {
-
-    /**
-     * Create a self-signed X.509 Certificate.
-     * From http://bfo.com/blog/2011/03/08/odds_and_ends_creating_a_new_x_509_certificate.html.
-     *
-     * @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB"
-     * @param pair the KeyPair
-     * @param days how many days from now the Certificate is valid for
-     * @param algorithm the signing algorithm, eg "SHA1withRSA"
-     * @return the self-signed certificate
-     * @throws CertificateException thrown if a security error or an IO error ocurred.
-     */
-    public static X509Certificate generateCertificate(String dn, KeyPair pair,
-                                                      int days, String algorithm)
-        throws  CertificateException {
-
-        try {
-            Security.addProvider(new BouncyCastleProvider());
-            AlgorithmIdentifier sigAlgId = new DefaultSignatureAlgorithmIdentifierFinder().find(algorithm);
-            AlgorithmIdentifier digAlgId = new DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId);
-            AsymmetricKeyParameter privateKeyAsymKeyParam = PrivateKeyFactory.createKey(pair.getPrivate().getEncoded());
-            SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(pair.getPublic().getEncoded());
-            ContentSigner sigGen = new BcRSAContentSignerBuilder(sigAlgId, digAlgId).build(privateKeyAsymKeyParam);
-            X500Name name = new X500Name(dn);
-            Date from = new Date();
-            Date to = new Date(from.getTime() + days * 86400000L);
-            BigInteger sn = new BigInteger(64, new SecureRandom());
-
-            X509v1CertificateBuilder v1CertGen = new X509v1CertificateBuilder(name, sn, from, to, name, subPubKeyInfo);
-            X509CertificateHolder certificateHolder = v1CertGen.build(sigGen);
-            return new JcaX509CertificateConverter().setProvider("BC").getCertificate(certificateHolder);
-        } catch (CertificateException ce) {
-            throw ce;
-        } catch (Exception e) {
-            throw new CertificateException(e);
-        }
-    }
-
-    public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException {
-        KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm);
-        keyGen.initialize(1024);
-        return keyGen.genKeyPair();
-    }
-
-    private static KeyStore createEmptyKeyStore() throws GeneralSecurityException, IOException {
-        KeyStore ks = KeyStore.getInstance("JKS");
-        ks.load(null, null); // initialize
-        return ks;
-    }
-
-    private static void saveKeyStore(KeyStore ks, String filename,
-                                     String password) throws GeneralSecurityException, IOException {
-        FileOutputStream out = new FileOutputStream(filename);
-        try {
-            ks.store(out, password.toCharArray());
-        } finally {
-            out.close();
-        }
-    }
-
-    public static void createKeyStore(String filename,
-                                      String password, String alias,
-                                      Key privateKey, Certificate cert) throws GeneralSecurityException, IOException {
-        KeyStore ks = createEmptyKeyStore();
-        ks.setKeyEntry(alias, privateKey, password.toCharArray(),
-                new Certificate[]{cert});
-        saveKeyStore(ks, filename, password);
-    }
-
-    /**
-     * Creates a keystore with a single key and saves it to a file.
-     *
-     * @param filename String file to save
-     * @param password String store password to set on keystore
-     * @param keyPassword String key password to set on key
-     * @param alias String alias to use for the key
-     * @param privateKey Key to save in keystore
-     * @param cert Certificate to use as certificate chain associated to key
-     * @throws GeneralSecurityException for any error with the security APIs
-     * @throws IOException if there is an I/O error saving the file
-     */
-    public static void createKeyStore(String filename,
-                                      String password, String keyPassword, String alias,
-                                      Key privateKey, Certificate cert) throws GeneralSecurityException, IOException {
-        KeyStore ks = createEmptyKeyStore();
-        ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(),
-                new Certificate[]{cert});
-        saveKeyStore(ks, filename, password);
-    }
-
-    public static void createTrustStore(String filename,
-                                        String password, String alias,
-                                        Certificate cert) throws GeneralSecurityException, IOException {
-        KeyStore ks = createEmptyKeyStore();
-        ks.setCertificateEntry(alias, cert);
-        saveKeyStore(ks, filename, password);
-    }
-
-    public static <T extends Certificate> void createTrustStore(
-            String filename, String password, Map<String, T> certs) throws GeneralSecurityException, IOException {
-        KeyStore ks = KeyStore.getInstance("JKS");
-        try {
-            FileInputStream in = new FileInputStream(filename);
-            ks.load(in, password.toCharArray());
-            in.close();
-        } catch (EOFException e) {
-            ks = createEmptyKeyStore();
-        }
-        for (Map.Entry<String, T> cert : certs.entrySet()) {
-            ks.setCertificateEntry(cert.getKey(), cert.getValue());
-        }
-        saveKeyStore(ks, filename, password);
-    }
-
-    public static Map<String, X509Certificate> createX509Certificates(KeyPair keyPair)
-        throws GeneralSecurityException {
-        Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
-        X509Certificate cert = generateCertificate("CN=localhost, O=localhost", keyPair, 30, "SHA1withRSA");
-        certs.put("localhost", cert);
-        return certs;
-    }
-
-    public static Map<String, Object> createSSLConfig(Mode mode, File keyStoreFile, String password, String keyPassword,
-                                                      File trustStoreFile, String trustStorePassword) {
-        Map<String, Object> sslConfigs = new HashMap<String, Object>();
-        sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security protocol
-        sslConfigs.put(SSLConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
-
-        if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) {
-            sslConfigs.put(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath());
-            sslConfigs.put(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
-            sslConfigs.put(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
-            sslConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
-            sslConfigs.put(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
-        }
-
-        sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath());
-        sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
-        sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
-        sslConfigs.put(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
-
-        List<String> enabledProtocols  = new ArrayList<String>();
-        enabledProtocols.add("TLSv1.2");
-        sslConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
-
-        return sslConfigs;
-    }
-
-    public static  Map<String, Object> createSSLConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias)
-        throws IOException, GeneralSecurityException {
-        Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
-        File keyStoreFile;
-        String password;
-
-        if (mode == Mode.SERVER)
-            password = "ServerPassword";
-        else
-            password = "ClientPassword";
-
-        String trustStorePassword = "TrustStorePassword";
-
-        if (useClientCert) {
-            keyStoreFile = File.createTempFile("clientKS", ".jks");
-            KeyPair cKP = generateKeyPair("RSA");
-            X509Certificate cCert = generateCertificate("CN=localhost, O=client", cKP, 30, "SHA1withRSA");
-            createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(), cCert);
-            certs.put(certAlias, cCert);
-        } else {
-            keyStoreFile = File.createTempFile("serverKS", ".jks");
-            KeyPair sKP = generateKeyPair("RSA");
-            X509Certificate sCert = generateCertificate("CN=localhost, O=server", sKP, 30,
-                                                        "SHA1withRSA");
-            createKeyStore(keyStoreFile.getPath(), password, password, "server", sKP.getPrivate(), sCert);
-            certs.put(certAlias, sCert);
-        }
-
-        if (trustStore) {
-            createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs);
-        }
-
-        Map<String, Object> sslConfig = createSSLConfig(mode, keyStoreFile, password,
-                                                        password, trustStoreFile, trustStorePassword);
-        return sslConfig;
-    }
-
-}