You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2015/10/24 18:45:07 UTC

[4/4] kafka git commit: KAFKA-2460; Fix capitalisation in SSL classes

KAFKA-2460; Fix capitalisation in SSL classes

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Sriharsha Chintalapani <ha...@hortonworks.com>

Closes #355 from ijuma/kafka-2460-fix-capitalisation-in-ssl-classes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/16f194b2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/16f194b2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/16f194b2

Branch: refs/heads/trunk
Commit: 16f194b20ad9795188f1d7781e7cbca1cd2a6a2d
Parents: 6f2f1f9
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sat Oct 24 09:42:19 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sat Oct 24 09:42:19 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |  32 +-
 .../kafka/clients/producer/ProducerConfig.java  |  32 +-
 .../apache/kafka/common/config/SSLConfigs.java  |  99 ---
 .../apache/kafka/common/config/SslConfigs.java  |  99 +++
 .../kafka/common/network/ChannelBuilders.java   |   2 +-
 .../common/network/PlaintextChannelBuilder.java |   5 +-
 .../kafka/common/network/SSLChannelBuilder.java |  75 --
 .../kafka/common/network/SSLTransportLayer.java | 734 -------------------
 .../common/network/SaslChannelBuilder.java      |  14 +-
 .../kafka/common/network/SslChannelBuilder.java |  75 ++
 .../kafka/common/network/SslTransportLayer.java | 733 ++++++++++++++++++
 .../kafka/common/network/TransportLayer.java    |   3 +-
 .../kafka/common/security/ssl/SSLFactory.java   | 198 -----
 .../kafka/common/security/ssl/SslFactory.java   | 198 +++++
 .../clients/producer/KafkaProducerTest.java     |   4 +-
 .../apache/kafka/common/network/EchoServer.java |   6 +-
 .../kafka/common/network/SSLSelectorTest.java   | 170 -----
 .../common/network/SSLTransportLayerTest.java   | 652 ----------------
 .../kafka/common/network/SelectorTest.java      |   4 +-
 .../kafka/common/network/SslSelectorTest.java   | 170 +++++
 .../common/network/SslTransportLayerTest.java   | 651 ++++++++++++++++
 .../common/security/ssl/SSLFactoryTest.java     |  61 --
 .../common/security/ssl/SslFactoryTest.java     |  61 ++
 .../org/apache/kafka/test/TestSSLUtils.java     | 243 ------
 .../org/apache/kafka/test/TestSslUtils.java     | 242 ++++++
 .../distributed/DistributedHerderConfig.java    |  32 +-
 .../main/scala/kafka/api/FetchResponse.scala    |   2 +-
 core/src/main/scala/kafka/cluster/Broker.scala  |   6 +-
 .../main/scala/kafka/server/KafkaConfig.scala   | 146 ++--
 .../kafka/server/ReplicaFetcherThread.scala     |   2 +-
 .../unit/kafka/network/SocketServerTest.scala   |   2 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |  30 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   8 +-
 33 files changed, 2392 insertions(+), 2399 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 5cc0419..14c54c2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -17,7 +17,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.serialization.Deserializer;
 
@@ -287,21 +287,21 @@ public class ConsumerConfig extends AbstractConfig {
                                         Importance.HIGH,
                                         VALUE_DESERIALIZER_CLASS_DOC)
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
-                                .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
-                                .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
-                                .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
-                                .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
-                                .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
-                                .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
-                                .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
-                                .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
-                                .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
-                                .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
-                                .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
-                                .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
-                                .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
-                                .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
-                                .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+                                .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                                .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
+                                .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
+                                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
+                                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
+                                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
+                                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+                                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+                                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
                                 .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
                                 .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
                                 .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index b3cfe70..6d40b77 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -22,7 +22,7 @@ import java.util.Properties;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -265,21 +265,21 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
                                 .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
-                                .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
-                                .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
-                                .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
-                                .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
-                                .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
-                                .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
-                                .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
-                                .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
-                                .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
-                                .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
-                                .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
-                                .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
-                                .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
-                                .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
-                                .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+                                .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                                .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
+                                .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
+                                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
+                                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
+                                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
+                                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+                                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+                                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
                                 .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
                                 .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
                                 .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
deleted file mode 100644
index 207a349..0000000
--- a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.common.config;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.TrustManagerFactory;
-
-public class SSLConfigs {
-    /*
-     * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
-     */
-
-    public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class";
-    public static final String PRINCIPAL_BUILDER_CLASS_DOC = "principal builder to generate a java Principal. This config is optional for client.";
-    public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder";
-
-    public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
-    public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the SSLContext. "
-            + "Default setting is TLS, which is fine for most cases. "
-            + "Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 "
-            + "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.";
-
-    public static final String DEFAULT_SSL_PROTOCOL = "TLS";
-
-    public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
-    public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";
-
-    public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";
-    public static final String SSL_CIPHER_SUITES_DOC = "A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol."
-            + "By default all the available cipher suites are supported.";
-
-    public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
-    public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
-            + "TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.";
-    public static final String DEFAULT_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
-
-    public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
-    public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
-            + "This is optional for client. Default value is JKS";
-    public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";
-
-    public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
-    public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. "
-        + "This is optional for client and can be used for two-way authentication for client.";
-
-    public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
-    public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file."
-        + "This is optional for client and only needed if ssl.keystore.location is configured. ";
-
-    public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
-    public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file. "
-            + "This is optional for client.";
-
-    public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
-    public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file. "
-            + "Default value is JKS.";
-    public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
-
-    public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
-    public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. ";
-
-    public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";
-    public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. ";
-
-    public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm";
-    public static final String SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections. "
-            + "Default value is the key manager factory algorithm configured for the Java Virtual Machine.";
-    public static final String DEFAULT_SSL_KEYMANGER_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
-
-    public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm";
-    public static final String SSL_TRUSTMANAGER_ALGORITHM_DOC = "The algorithm used by trust manager factory for SSL connections. "
-            + "Default value is the trust manager factory algorithm configured for the Java Virtual Machine.";
-    public static final String DEFAULT_SSL_TRUSTMANAGER_ALGORITHM = TrustManagerFactory.getDefaultAlgorithm();
-
-    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "ssl.endpoint.identification.algorithm";
-    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "The endpoint identification algorithm to validate server hostname using server certificate. ";
-
-    public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
-    public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication."
-                                           + " The following settings are common: "
-                                           + " <ul>"
-                                           + " <li><code>ssl.want.client.auth=required</code> If set to required"
-                                           + " client authentication is required."
-                                           + " <li><code>ssl.client.auth=requested</code> This means client authentication is optional."
-                                           + " unlike requested , if this option is set client can choose not to provide authentication information about itself"
-                                           + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed.";
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
new file mode 100644
index 0000000..60e1eb3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+public class SslConfigs {
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class";
+    public static final String PRINCIPAL_BUILDER_CLASS_DOC = "principal builder to generate a java Principal. This config is optional for client.";
+    public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder";
+
+    public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
+    public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the SSLContext. "
+            + "Default setting is TLS, which is fine for most cases. "
+            + "Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 "
+            + "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.";
+
+    public static final String DEFAULT_SSL_PROTOCOL = "TLS";
+
+    public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
+    public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";
+
+    public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";
+    public static final String SSL_CIPHER_SUITES_DOC = "A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol."
+            + "By default all the available cipher suites are supported.";
+
+    public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
+    public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
+            + "TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.";
+    public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
+
+    public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
+    public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
+            + "This is optional for client. Default value is JKS";
+    public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";
+
+    public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
+    public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. "
+        + "This is optional for client and can be used for two-way authentication for client.";
+
+    public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
+    public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file."
+        + "This is optional for client and only needed if ssl.keystore.location is configured. ";
+
+    public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
+    public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file. "
+            + "This is optional for client.";
+
+    public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
+    public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file. "
+            + "Default value is JKS.";
+    public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
+
+    public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
+    public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. ";
+
+    public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";
+    public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. ";
+
+    public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm";
+    public static final String SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections. "
+            + "Default value is the key manager factory algorithm configured for the Java Virtual Machine.";
+    public static final String DEFAULT_SSL_KEYMANGER_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
+
+    public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm";
+    public static final String SSL_TRUSTMANAGER_ALGORITHM_DOC = "The algorithm used by trust manager factory for SSL connections. "
+            + "Default value is the trust manager factory algorithm configured for the Java Virtual Machine.";
+    public static final String DEFAULT_SSL_TRUSTMANAGER_ALGORITHM = TrustManagerFactory.getDefaultAlgorithm();
+
+    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "ssl.endpoint.identification.algorithm";
+    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "The endpoint identification algorithm to validate server hostname using server certificate. ";
+
+    public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
+    public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication."
+                                           + " The following settings are common: "
+                                           + " <ul>"
+                                           + " <li><code>ssl.want.client.auth=required</code> If set to required"
+                                           + " client authentication is required."
+                                           + " <li><code>ssl.client.auth=requested</code> This means client authentication is optional."
+                                           + " unlike requested , if this option is set client can choose not to provide authentication information about itself"
+                                           + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed.";
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 1e5d840..03c663d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -36,7 +36,7 @@ public class ChannelBuilders {
         switch (securityProtocol) {
             case SSL:
                 requireNonNullMode(mode, securityProtocol);
-                channelBuilder = new SSLChannelBuilder(mode);
+                channelBuilder = new SslChannelBuilder(mode);
                 break;
             case SASL_SSL:
             case SASL_PLAINTEXT:

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index a028159..bc1536a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -15,15 +15,14 @@ package org.apache.kafka.common.network;
 import java.nio.channels.SelectionKey;
 import java.util.Map;
 
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
-import org.apache.kafka.common.config.SSLConfigs;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.KafkaException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class PlaintextChannelBuilder implements ChannelBuilder {
     private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class);
     private PrincipalBuilder principalBuilder;
@@ -32,7 +31,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
     public void configure(Map<String, ?> configs) throws KafkaException {
         try {
             this.configs = configs;
-            this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+            this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
             this.principalBuilder.configure(this.configs);
         } catch (Exception e) {
             throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
deleted file mode 100644
index 1dd1ecd..0000000
--- a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.network;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.Map;
-
-import org.apache.kafka.common.security.auth.PrincipalBuilder;
-import org.apache.kafka.common.security.ssl.SSLFactory;
-import org.apache.kafka.common.config.SSLConfigs;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.common.KafkaException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SSLChannelBuilder implements ChannelBuilder {
-    private static final Logger log = LoggerFactory.getLogger(SSLChannelBuilder.class);
-    private SSLFactory sslFactory;
-    private PrincipalBuilder principalBuilder;
-    private Mode mode;
-    private Map<String, ?> configs;
-
-    public SSLChannelBuilder(Mode mode) {
-        this.mode = mode;
-    }
-
-    public void configure(Map<String, ?> configs) throws KafkaException {
-        try {
-            this.configs = configs;
-            this.sslFactory = new SSLFactory(mode);
-            this.sslFactory.configure(this.configs);
-            this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
-            this.principalBuilder.configure(this.configs);
-        } catch (Exception e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
-        KafkaChannel channel = null;
-        try {
-            SSLTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key);
-            Authenticator authenticator = new DefaultAuthenticator();
-            authenticator.configure(transportLayer, this.principalBuilder, this.configs);
-            channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
-        } catch (Exception e) {
-            log.info("Failed to create channel due to ", e);
-            throw new KafkaException(e);
-        }
-        return channel;
-    }
-
-    public void close()  {
-        this.principalBuilder.close();
-    }
-
-    protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException {
-        SocketChannel socketChannel = (SocketChannel) key.channel();
-        return SSLTransportLayer.create(id, key,
-            sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
-            socketChannel.socket().getPort()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
deleted file mode 100644
index 813f0b1..0000000
--- a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
+++ /dev/null
@@ -1,734 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.network;
-
-import java.io.IOException;
-import java.io.EOFException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.CancelledKeyException;
-
-import java.security.Principal;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLHandshakeException;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.SSLPeerUnverifiedException;
-
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import org.apache.kafka.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
- * Transport layer for SSL communication
- */
-
-public class SSLTransportLayer implements TransportLayer {
-    private static final Logger log = LoggerFactory.getLogger(SSLTransportLayer.class);
-    private final String channelId;
-    private final SSLEngine sslEngine;
-    private final SelectionKey key;
-    private final SocketChannel socketChannel;
-    private final boolean enableRenegotiation;
-
-    private HandshakeStatus handshakeStatus;
-    private SSLEngineResult handshakeResult;
-    private boolean handshakeComplete = false;
-    private boolean closing = false;
-    private ByteBuffer netReadBuffer;
-    private ByteBuffer netWriteBuffer;
-    private ByteBuffer appReadBuffer;
-    private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
-
-    public static SSLTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
-        // Disable renegotiation by default until we have fixed the known issues with the existing implementation
-        SSLTransportLayer transportLayer = new SSLTransportLayer(channelId, key, sslEngine, false);
-        transportLayer.startHandshake();
-        return transportLayer;
-    }
-
-    // Prefer `create`, only use this in tests
-    SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, boolean enableRenegotiation) throws IOException {
-        this.channelId = channelId;
-        this.key = key;
-        this.socketChannel = (SocketChannel) key.channel();
-        this.sslEngine = sslEngine;
-        this.enableRenegotiation = enableRenegotiation;
-    }
-
-    /**
-     * starts sslEngine handshake process
-     */
-    protected void startHandshake() throws IOException {
-
-        this.netReadBuffer = ByteBuffer.allocate(netReadBufferSize());
-        this.netWriteBuffer = ByteBuffer.allocate(netWriteBufferSize());
-        this.appReadBuffer = ByteBuffer.allocate(applicationBufferSize());
-        
-        //clear & set netRead & netWrite buffers
-        netWriteBuffer.position(0);
-        netWriteBuffer.limit(0);
-        netReadBuffer.position(0);
-        netReadBuffer.limit(0);
-        handshakeComplete = false;
-        closing = false;
-        //initiate handshake
-        sslEngine.beginHandshake();
-        handshakeStatus = sslEngine.getHandshakeStatus();
-    }
-
-    @Override
-    public boolean ready() {
-        return handshakeComplete;
-    }
-
-    /**
-     * does socketChannel.finishConnect()
-     */
-    @Override
-    public void finishConnect() throws IOException {
-        socketChannel.finishConnect();
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
-    }
-
-    /**
-     * disconnects selectionKey.
-     */
-    @Override
-    public void disconnect() {
-        key.cancel();
-    }
-
-    @Override
-    public SocketChannel socketChannel() {
-        return socketChannel;
-    }
-
-    @Override
-    public boolean isOpen() {
-        return socketChannel.isOpen();
-    }
-
-    @Override
-    public boolean isConnected() {
-        return socketChannel.isConnected();
-    }
-
-
-    /**
-    * Sends a SSL close message and closes socketChannel.
-    * @throws IOException if an I/O error occurs
-    * @throws IOException if there is data on the outgoing network buffer and we are unable to flush it
-    */
-    @Override
-    public void close() throws IOException {
-        if (closing) return;
-        closing = true;
-        sslEngine.closeOutbound();
-        try {
-            if (!flush(netWriteBuffer)) {
-                throw new IOException("Remaining data in the network buffer, can't send SSL close message.");
-            }
-            //prep the buffer for the close message
-            netWriteBuffer.clear();
-            //perform the close, since we called sslEngine.closeOutbound
-            SSLEngineResult handshake = sslEngine.wrap(emptyBuf, netWriteBuffer);
-            //we should be in a close state
-            if (handshake.getStatus() != SSLEngineResult.Status.CLOSED) {
-                throw new IOException("Invalid close state, will not send network data.");
-            }
-            netWriteBuffer.flip();
-            flush(netWriteBuffer);
-            socketChannel.socket().close();
-            socketChannel.close();
-        } catch (IOException ie) {
-            log.warn("Failed to send SSL Close message ", ie);
-        }
-        key.attach(null);
-        key.cancel();
-    }
-
-    /**
-     * returns true if there are any pending contents in netWriteBuffer
-     */
-    @Override
-    public boolean hasPendingWrites() {
-        return netWriteBuffer.hasRemaining();
-    }
-
-    /**
-    * Flushes the buffer to the network, non blocking
-    * @param buf ByteBuffer
-    * @return boolean true if the buffer has been emptied out, false otherwise
-    * @throws IOException
-    */
-    private boolean flush(ByteBuffer buf) throws IOException {
-        int remaining = buf.remaining();
-        if (remaining > 0) {
-            int written = socketChannel.write(buf);
-            return written >= remaining;
-        }
-        return true;
-    }
-
-    /**
-    * Performs SSL handshake, non blocking.
-    * Before application data (kafka protocols) can be sent client & kafka broker must
-    * perform ssl handshake.
-    * During the handshake SSLEngine generates encrypted data that will be transported over socketChannel.
-    * Each SSLEngine operation generates SSLEngineResult , of which SSLEngineResult.handshakeStatus field is used to
-    * determine what operation needs to occur to move handshake along.
-    * A typical handshake might look like this.
-    * +-------------+----------------------------------+-------------+
-    * |  client     |  SSL/TLS message                 | HSStatus    |
-    * +-------------+----------------------------------+-------------+
-    * | wrap()      | ClientHello                      | NEED_UNWRAP |
-    * | unwrap()    | ServerHello/Cert/ServerHelloDone | NEED_WRAP   |
-    * | wrap()      | ClientKeyExchange                | NEED_WRAP   |
-    * | wrap()      | ChangeCipherSpec                 | NEED_WRAP   |
-    * | wrap()      | Finished                         | NEED_UNWRAP |
-    * | unwrap()    | ChangeCipherSpec                 | NEED_UNWRAP |
-    * | unwrap()    | Finished                         | FINISHED    |
-    * +-------------+----------------------------------+-------------+
-    *
-    * @throws IOException
-    */
-    @Override
-    public void handshake() throws IOException {
-        boolean read = key.isReadable();
-        boolean write = key.isWritable();
-        handshakeComplete = false;
-        handshakeStatus = sslEngine.getHandshakeStatus();
-        if (!flush(netWriteBuffer)) {
-            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-            return;
-        }
-        try {
-            switch (handshakeStatus) {
-                case NEED_TASK:
-                    log.trace("SSLHandshake NEED_TASK channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
-                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
-                    handshakeStatus = runDelegatedTasks();
-                    break;
-                case NEED_WRAP:
-                    log.trace("SSLHandshake NEED_WRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
-                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
-                    handshakeResult = handshakeWrap(write);
-                    if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
-                        int currentNetWriteBufferSize = netWriteBufferSize();
-                        netWriteBuffer.compact();
-                        netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize);
-                        netWriteBuffer.flip();
-                        if (netWriteBuffer.limit() >= currentNetWriteBufferSize) {
-                            throw new IllegalStateException("Buffer overflow when available data size (" + netWriteBuffer.limit() +
-                                                            ") >= network buffer size (" + currentNetWriteBufferSize + ")");
-                        }
-                    } else if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
-                        throw new IllegalStateException("Should not have received BUFFER_UNDERFLOW during handshake WRAP.");
-                    } else if (handshakeResult.getStatus() == Status.CLOSED) {
-                        throw new EOFException();
-                    }
-                    log.trace("SSLHandshake NEED_WRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
-                              channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
-                    //if handshake status is not NEED_UNWRAP or unable to flush netWriteBuffer contents
-                    //we will break here otherwise we can do need_unwrap in the same call.
-                    if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || !flush(netWriteBuffer)) {
-                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                        break;
-                    }
-                case NEED_UNWRAP:
-                    log.trace("SSLHandshake NEED_UNWRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
-                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
-                    do {
-                        handshakeResult = handshakeUnwrap(read);
-                        if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
-                            int currentAppBufferSize = applicationBufferSize();
-                            appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentAppBufferSize);
-                            if (appReadBuffer.position() > currentAppBufferSize) {
-                                throw new IllegalStateException("Buffer underflow when available data size (" + appReadBuffer.position() +
-                                                                ") > packet buffer size (" + currentAppBufferSize + ")");
-                            }
-                        }
-                    } while (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW);
-                    if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
-                        int currentNetReadBufferSize = netReadBufferSize();
-                        netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentNetReadBufferSize);
-                        if (netReadBuffer.position() >= currentNetReadBufferSize) {
-                            throw new IllegalStateException("Buffer underflow when there is available data");
-                        }
-                    } else if (handshakeResult.getStatus() == Status.CLOSED) {
-                        throw new EOFException("SSL handshake status CLOSED during handshake UNWRAP");
-                    }
-                    log.trace("SSLHandshake NEED_UNWRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
-                              channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
-
-                    //if handshakeStatus completed than fall-through to finished status.
-                    //after handshake is finished there is no data left to read/write in socketChannel.
-                    //so the selector won't invoke this channel if we don't go through the handshakeFinished here.
-                    if (handshakeStatus != HandshakeStatus.FINISHED) {
-                        if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
-                            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                        } else if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
-                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-                        }
-                        break;
-                    }
-                case FINISHED:
-                    handshakeFinished();
-                    break;
-                case NOT_HANDSHAKING:
-                    handshakeFinished();
-                    break;
-                default:
-                    throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus));
-            }
-
-        } catch (SSLException e) {
-            handshakeFailure();
-            throw e;
-        }
-    }
-
-    private void renegotiate() throws IOException {
-        if (!enableRenegotiation)
-            throw new SSLHandshakeException("Renegotiation is not supported");
-        handshake();
-    }
-
-
-    /**
-     * Executes the SSLEngine tasks needed.
-     * @return HandshakeStatus
-     */
-    private HandshakeStatus runDelegatedTasks() {
-        for (;;) {
-            Runnable task = delegatedTask();
-            if (task == null) {
-                break;
-            }
-            task.run();
-        }
-        return sslEngine.getHandshakeStatus();
-    }
-
-    /**
-     * Checks if the handshake status is finished
-     * Sets the interestOps for the selectionKey.
-     */
-    private void handshakeFinished() throws IOException {
-        // SSLEngine.getHandshakeStatus is transient and it doesn't record FINISHED status properly.
-        // It can move from FINISHED status to NOT_HANDSHAKING after the handshake is completed.
-        // Hence we also need to check handshakeResult.getHandshakeStatus() if the handshake finished or not
-        if (handshakeResult.getHandshakeStatus() == HandshakeStatus.FINISHED) {
-            //we are complete if we have delivered the last package
-            handshakeComplete = !netWriteBuffer.hasRemaining();
-            //remove OP_WRITE if we are complete, otherwise we still have data to write
-            if (!handshakeComplete)
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-            else
-                key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
-            log.trace("SSLHandshake FINISHED channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} ",
-                      channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
-        } else {
-            throw new IOException("NOT_HANDSHAKING during handshake");
-        }
-    }
-
-    /**
-    * Performs the WRAP function
-    * @param doWrite boolean
-    * @return SSLEngineResult
-    * @throws IOException
-    */
-    private SSLEngineResult handshakeWrap(boolean doWrite) throws IOException {
-        log.trace("SSLHandshake handshakeWrap {}", channelId);
-        if (netWriteBuffer.hasRemaining())
-            throw new IllegalStateException("handshakeWrap called with netWriteBuffer not empty");
-        //this should never be called with a network buffer that contains data
-        //so we can clear it here.
-        netWriteBuffer.clear();
-        SSLEngineResult result = sslEngine.wrap(emptyBuf, netWriteBuffer);
-        //prepare the results to be written
-        netWriteBuffer.flip();
-        handshakeStatus = result.getHandshakeStatus();
-        if (result.getStatus() == SSLEngineResult.Status.OK &&
-            result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
-            handshakeStatus = runDelegatedTasks();
-        }
-
-        if (doWrite) flush(netWriteBuffer);
-        return result;
-    }
-
-    /**
-    * Perform handshake unwrap
-    * @param doRead boolean
-    * @return SSLEngineResult
-    * @throws IOException
-    */
-    private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException {
-        log.trace("SSLHandshake handshakeUnwrap {}", channelId);
-        SSLEngineResult result;
-        boolean cont = false;
-        int read = 0;
-        if (doRead)  {
-            read = socketChannel.read(netReadBuffer);
-            if (read == -1) throw new EOFException("EOF during handshake.");
-        }
-        do {
-            //prepare the buffer with the incoming data
-            netReadBuffer.flip();
-            result = sslEngine.unwrap(netReadBuffer, appReadBuffer);
-            netReadBuffer.compact();
-            handshakeStatus = result.getHandshakeStatus();
-            if (result.getStatus() == SSLEngineResult.Status.OK &&
-                result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
-                handshakeStatus = runDelegatedTasks();
-            }
-            cont = result.getStatus() == SSLEngineResult.Status.OK &&
-                handshakeStatus == HandshakeStatus.NEED_UNWRAP;
-            log.trace("SSLHandshake handshakeUnwrap: handshakeStatus {} status {}", handshakeStatus, result.getStatus());
-        } while (netReadBuffer.position() != 0 && cont);
-
-        return result;
-    }
-
-
-    /**
-    * Reads a sequence of bytes from this channel into the given buffer.
-    *
-    * @param dst The buffer into which bytes are to be transferred
-    * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream
-    * @throws IOException if some other I/O error occurs
-    */
-    @Override
-    public int read(ByteBuffer dst) throws IOException {
-        if (closing) return -1;
-        int read = 0;
-        if (!handshakeComplete) return read;
-
-        //if we have unread decrypted data in appReadBuffer read that into dst buffer.
-        if (appReadBuffer.position() > 0) {
-            read = readFromAppBuffer(dst);
-        }
-
-        if (dst.remaining() > 0) {
-            netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
-            if (netReadBuffer.remaining() > 0) {
-                int netread = socketChannel.read(netReadBuffer);
-                if (netread == 0) return netread;
-                else if (netread < 0) throw new EOFException("EOF during read");
-            }
-            do {
-                netReadBuffer.flip();
-                SSLEngineResult unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer);
-                netReadBuffer.compact();
-                // handle ssl renegotiation.
-                if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && unwrapResult.getStatus() == Status.OK) {
-                    log.trace("SSLChannel Read begin renegotiation channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
-                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
-                    renegotiate();
-                    break;
-                }
-
-                if (unwrapResult.getStatus() == Status.OK) {
-                    read += readFromAppBuffer(dst);
-                } else if (unwrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
-                    int currentApplicationBufferSize = applicationBufferSize();
-                    appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentApplicationBufferSize);
-                    if (appReadBuffer.position() >= currentApplicationBufferSize) {
-                        throw new IllegalStateException("Buffer overflow when available data size (" + appReadBuffer.position() +
-                                                        ") >= application buffer size (" + currentApplicationBufferSize + ")");
-                    }
-
-                    // appReadBuffer will extended upto currentApplicationBufferSize
-                    // we need to read the existing content into dst before we can do unwrap again. If there are no space in dst
-                    // we can break here.
-                    if (dst.hasRemaining())
-                        read += readFromAppBuffer(dst);
-                    else
-                        break;
-                } else if (unwrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
-                    int currentNetReadBufferSize = netReadBufferSize();
-                    netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentNetReadBufferSize);
-                    if (netReadBuffer.position() >= currentNetReadBufferSize) {
-                        throw new IllegalStateException("Buffer underflow when available data size (" + netReadBuffer.position() +
-                                                        ") > packet buffer size (" + currentNetReadBufferSize + ")");
-                    }
-                    break;
-                } else if (unwrapResult.getStatus() == Status.CLOSED) {
-                    throw new EOFException();
-                }
-            } while (netReadBuffer.position() != 0);
-        }
-        return read;
-    }
-
-
-    /**
-     * Reads a sequence of bytes from this channel into the given buffers.
-     *
-     * @param dsts - The buffers into which bytes are to be transferred.
-     * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
-     * @throws IOException if some other I/O error occurs
-     */
-    @Override
-    public long read(ByteBuffer[] dsts) throws IOException {
-        return read(dsts, 0, dsts.length);
-    }
-
-
-    /**
-     * Reads a sequence of bytes from this channel into a subsequence of the given buffers.
-     * @param dsts - The buffers into which bytes are to be transferred
-     * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
-     * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
-     * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
-     * @throws IOException if some other I/O error occurs
-     */
-    @Override
-    public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
-        if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
-            throw new IndexOutOfBoundsException();
-
-        int totalRead = 0;
-        int i = offset;
-        while (i < length) {
-            if (dsts[i].hasRemaining()) {
-                int read = read(dsts[i]);
-                if (read > 0)
-                    totalRead += read;
-                else
-                    break;
-            }
-            if (!dsts[i].hasRemaining()) {
-                i++;
-            }
-        }
-        return totalRead;
-    }
-
-
-    /**
-    * Writes a sequence of bytes to this channel from the given buffer.
-    *
-    * @param src The buffer from which bytes are to be retrieved
-    * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
-    * @throws IOException If some other I/O error occurs
-    */
-    @Override
-    public int write(ByteBuffer src) throws IOException {
-        int written = 0;
-        if (closing) throw new IllegalStateException("Channel is in closing state");
-        if (!handshakeComplete) return written;
-
-        if (!flush(netWriteBuffer))
-            return written;
-
-        netWriteBuffer.clear();
-        SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
-        netWriteBuffer.flip();
-
-        //handle ssl renegotiation
-        if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK) {
-            renegotiate();
-            return written;
-        }
-
-        if (wrapResult.getStatus() == Status.OK) {
-            written = wrapResult.bytesConsumed();
-            flush(netWriteBuffer);
-        } else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
-            int currentNetWriteBufferSize = netWriteBufferSize();
-            netWriteBuffer.compact();
-            netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize);
-            netWriteBuffer.flip();
-            if (netWriteBuffer.limit() >= currentNetWriteBufferSize)
-                throw new IllegalStateException("SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.limit() + ") >= network buffer size (" + currentNetWriteBufferSize + ")");
-        } else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
-            throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write");
-        } else if (wrapResult.getStatus() == Status.CLOSED) {
-            throw new EOFException();
-        }
-        return written;
-    }
-
-    /**
-    * Writes a sequence of bytes to this channel from the subsequence of the given buffers.
-    *
-    * @param srcs The buffers from which bytes are to be retrieved
-    * @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length.
-    * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset.
-    * @return returns no.of bytes written , possibly zero.
-    * @throws IOException If some other I/O error occurs
-    */
-    @Override
-    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
-        if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
-            throw new IndexOutOfBoundsException();
-        int totalWritten = 0;
-        int i = offset;
-        while (i < length) {
-            if (srcs[i].hasRemaining() || hasPendingWrites()) {
-                int written = write(srcs[i]);
-                if (written > 0) {
-                    totalWritten += written;
-                }
-            }
-            if (!srcs[i].hasRemaining() && !hasPendingWrites()) {
-                i++;
-            } else {
-                // if we are unable to write the current buffer to socketChannel we should break,
-                // as we might have reached max socket send buffer size.
-                break;
-            }
-        }
-        return totalWritten;
-    }
-
-    /**
-    * Writes a sequence of bytes to this channel from the given buffers.
-    *
-    * @param srcs The buffers from which bytes are to be retrieved
-    * @return returns no.of bytes consumed by SSLEngine.wrap , possibly zero.
-    * @throws IOException If some other I/O error occurs
-    */
-    @Override
-    public long write(ByteBuffer[] srcs) throws IOException {
-        return write(srcs, 0, srcs.length);
-    }
-
-
-    /**
-     * SSLSession's peerPrincipal for the remote host.
-     * @return Principal
-     */
-    public Principal peerPrincipal() throws IOException {
-        try {
-            return sslEngine.getSession().getPeerPrincipal();
-        } catch (SSLPeerUnverifiedException se) {
-            log.warn("SSL peer is not authenticated, returning ANONYMOUS instead");
-            return KafkaPrincipal.ANONYMOUS;
-        }
-    }
-
-    /**
-     * returns a SSL Session after the handshake is established
-     * throws IllegalStateException if the handshake is not established
-     */
-    public SSLSession sslSession() throws IllegalStateException {
-        return sslEngine.getSession();
-    }
-
-    /**
-     * Adds interestOps to SelectionKey of the TransportLayer
-     * @param ops SelectionKey interestOps
-     */
-    @Override
-    public void addInterestOps(int ops) {
-        if (!key.isValid())
-            throw new CancelledKeyException();
-        else if (!handshakeComplete)
-            throw new IllegalStateException("handshake is not completed");
-
-        key.interestOps(key.interestOps() | ops);
-    }
-
-    /**
-     * removes interestOps to SelectionKey of the TransportLayer
-     * @param ops SelectionKey interestOps
-     */
-    @Override
-    public void removeInterestOps(int ops) {
-        if (!key.isValid())
-            throw new CancelledKeyException();
-        else if (!handshakeComplete)
-            throw new IllegalStateException("handshake is not completed");
-
-        key.interestOps(key.interestOps() & ~ops);
-    }
-
-
-    /**
-     * returns delegatedTask for the SSLEngine.
-     */
-    protected Runnable delegatedTask() {
-        return sslEngine.getDelegatedTask();
-    }
-
-    /**
-     * transfers appReadBuffer contents (decrypted data) into dst bytebuffer
-     * @param dst ByteBuffer
-     */
-    private int readFromAppBuffer(ByteBuffer dst) {
-        appReadBuffer.flip();
-        int remaining = Math.min(appReadBuffer.remaining(), dst.remaining());
-        if (remaining > 0) {
-            int limit = appReadBuffer.limit();
-            appReadBuffer.limit(appReadBuffer.position() + remaining);
-            dst.put(appReadBuffer);
-            appReadBuffer.limit(limit);
-        }
-        appReadBuffer.compact();
-        return remaining;
-    }
-
-    protected int netReadBufferSize() {
-        return sslEngine.getSession().getPacketBufferSize();
-    }
-    
-    protected int netWriteBufferSize() {
-        return sslEngine.getSession().getPacketBufferSize();
-    }
-
-    protected int applicationBufferSize() {
-        return sslEngine.getSession().getApplicationBufferSize();
-    }
-    
-    protected ByteBuffer netReadBuffer() {
-        return netReadBuffer;
-    }
-
-    private void handshakeFailure() {
-        //Release all resources such as internal buffers that SSLEngine is managing
-        sslEngine.closeOutbound();
-        try {
-            sslEngine.closeInbound();
-        } catch (SSLException e) {
-            log.debug("SSLEngine.closeInBound() raised an exception.", e);
-        }
-    }
-
-    @Override
-    public boolean isMute() {
-        return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
-    }
-
-    @Override
-    public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
-        return fileChannel.transferTo(position, count, this);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 53953c5..148e549 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -25,9 +25,9 @@ import org.apache.kafka.common.security.kerberos.KerberosNameParser;
 import org.apache.kafka.common.security.kerberos.LoginManager;
 import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
 import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
-import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.KafkaException;
 
@@ -43,7 +43,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
 
     private LoginManager loginManager;
     private PrincipalBuilder principalBuilder;
-    private SSLFactory sslFactory;
+    private SslFactory sslFactory;
     private Map<String, ?> configs;
     private KerberosNameParser kerberosNameParser;
 
@@ -57,7 +57,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
         try {
             this.configs = configs;
             this.loginManager = LoginManager.acquireLoginManager(loginType, configs);
-            this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+            this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
             this.principalBuilder.configure(configs);
 
             String defaultRealm;
@@ -69,7 +69,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
             kerberosNameParser = new KerberosNameParser(defaultRealm, (List<String>) configs.get(SaslConfigs.AUTH_TO_LOCAL));
 
             if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
-                this.sslFactory = new SSLFactory(mode);
+                this.sslFactory = new SslFactory(mode);
                 this.sslFactory.configure(this.configs);
             }
         } catch (Exception e) {
@@ -102,8 +102,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
 
     protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
         if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
-            return SSLTransportLayer.create(id, key,
-                sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
+            return SslTransportLayer.create(id, key,
+                sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
                 socketChannel.socket().getPort()));
         } else {
             return new PlaintextTransportLayer(key);

http://git-wip-us.apache.org/repos/asf/kafka/blob/16f194b2/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
new file mode 100644
index 0000000..8edd37e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SslChannelBuilder implements ChannelBuilder {
+    private static final Logger log = LoggerFactory.getLogger(SslChannelBuilder.class);
+    private SslFactory sslFactory;
+    private PrincipalBuilder principalBuilder;
+    private Mode mode;
+    private Map<String, ?> configs;
+
+    public SslChannelBuilder(Mode mode) {
+        this.mode = mode;
+    }
+
+    public void configure(Map<String, ?> configs) throws KafkaException {
+        try {
+            this.configs = configs;
+            this.sslFactory = new SslFactory(mode);
+            this.sslFactory.configure(this.configs);
+            this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+            this.principalBuilder.configure(this.configs);
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+        KafkaChannel channel = null;
+        try {
+            SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key);
+            Authenticator authenticator = new DefaultAuthenticator();
+            authenticator.configure(transportLayer, this.principalBuilder, this.configs);
+            channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+        } catch (Exception e) {
+            log.info("Failed to create channel due to ", e);
+            throw new KafkaException(e);
+        }
+        return channel;
+    }
+
+    public void close()  {
+        this.principalBuilder.close();
+    }
+
+    protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException {
+        SocketChannel socketChannel = (SocketChannel) key.channel();
+        return SslTransportLayer.create(id, key,
+            sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
+            socketChannel.socket().getPort()));
+    }
+}