You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/08/19 06:53:35 UTC

[1/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 503bd3664 -> 9e2c683f5


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c39402c..d547a01 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1,19 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
 
 package kafka.server
 
@@ -26,14 +26,17 @@ import kafka.consumer.ConsumerConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SSLConfigs
 import org.apache.kafka.common.config.ConfigDef.Importance._
 import org.apache.kafka.common.config.ConfigDef.Range._
 import org.apache.kafka.common.config.ConfigDef.Type._
+
 import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.PrincipalBuilder
+import scala.collection.{mutable, immutable, JavaConversions, Map}
 
-import scala.collection.{Map, immutable}
 
 object Defaults {
   /** ********* Zookeeper Configuration ***********/
@@ -151,6 +154,25 @@ object Defaults {
   val MetricNumSamples = 2
   val MetricSampleWindowMs = 30000
   val MetricReporterClasses = ""
+
+  /** ********* SSL configuration ***********/
+  val PrincipalBuilderClass = SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS
+  val SSLProtocol = SSLConfigs.DEFAULT_SSL_PROTOCOL
+  val SSLEnabledProtocols = SSLConfigs.DEFAULT_ENABLED_PROTOCOLS
+  val SSLKeystoreType = SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+  val SSLKeystoreLocation = "/tmp/ssl.keystore.jks"
+  val SSLKeystorePassword = "keystore_password"
+  val SSLKeyPassword = "key_password"
+  val SSLTruststoreType = SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+  val SSLTruststoreLocation = SSLConfigs.DEFAULT_TRUSTSTORE_LOCATION
+  val SSLTruststorePassword = SSLConfigs.DEFAULT_TRUSTSTORE_PASSWORD
+  val SSLKeyManagerAlgorithm = SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM
+  val SSLTrustManagerAlgorithm = SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
+  val SSLClientAuthRequired = "required"
+  val SSLClientAuthRequested = "requested"
+  val SSLClientAuthNone = "none"
+  val SSLClientAuth = SSLClientAuthNone
+
 }
 
 object KafkaConfig {
@@ -278,6 +300,25 @@ object KafkaConfig {
   val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
   val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
 
+  /** ********* SSL Configuration ****************/
+  val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
+  val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG
+  val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG
+  val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG
+  val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
+  val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG
+  val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG
+  val SSLKeystorePasswordProp = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
+  val SSLKeyPasswordProp = SSLConfigs.SSL_KEY_PASSWORD_CONFIG
+  val SSLTruststoreTypeProp = SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG
+  val SSLTruststoreLocationProp = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
+  val SSLTruststorePasswordProp = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
+  val SSLKeyManagerAlgorithmProp = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG
+  val SSLTrustManagerAlgorithmProp = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG
+  val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
+  val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG
+
+
   /* Documentation */
   /** ********* Zookeeper Configuration ***********/
   val ZkConnectDoc = "Zookeeper host string"
@@ -287,8 +328,8 @@ object KafkaConfig {
   /** ********* General Configuration ***********/
   val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
   val BrokerIdDoc = "The broker id for this server. " +
-    "To avoid conflicts between zookeeper generated brokerId and user's config.brokerId " +
-    "added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1."
+  "To avoid conflicts between zookeeper generated brokerId and user's config.brokerId " +
+  "added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1."
   val MessageMaxBytesDoc = "The maximum size of message that the server can receive"
   val NumNetworkThreadsDoc = "the number of network threads that the server uses for handling network requests"
   val NumIoThreadsDoc = "The number of io threads that the server uses for carrying out network requests"
@@ -298,21 +339,21 @@ object KafkaConfig {
   val PortDoc = "the port to listen and accept connections on"
   val HostNameDoc = "hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces"
   val ListenersDoc = "Listener List - Comma-separated list of URIs we will listen on and their protocols.\n" +
-          " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
-          " Leave hostname empty to bind to default interface.\n" +
-          " Examples of legal listener lists:\n" +
-          " PLAINTEXT://myhost:9092,TRACE://:9091\n" +
-          " PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093\n"
+  " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
+  " Leave hostname empty to bind to default interface.\n" +
+  " Examples of legal listener lists:\n" +
+  " PLAINTEXT://myhost:9092,TRACE://:9091\n" +
+  " PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093\n"
   val AdvertisedHostNameDoc = "Hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may " +
-    "need to be different from the interface to which the broker binds. If this is not set, " +
-    "it will use the value for \"host.name\" if configured. Otherwise " +
-    "it will use the value returned from java.net.InetAddress.getCanonicalHostName()."
+  "need to be different from the interface to which the broker binds. If this is not set, " +
+  "it will use the value for \"host.name\" if configured. Otherwise " +
+  "it will use the value returned from java.net.InetAddress.getCanonicalHostName()."
   val AdvertisedPortDoc = "The port to publish to ZooKeeper for clients to use. In IaaS environments, this may " +
-    "need to be different from the port to which the broker binds. If this is not set, " +
-    "it will publish the same port that the broker binds to."
+  "need to be different from the port to which the broker binds. If this is not set, " +
+  "it will publish the same port that the broker binds to."
   val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." +
-          " In IaaS environments, this may need to be different from the interface to which the broker binds." +
-          " If this is not set, the value for \"listeners\" will be used."
+  " In IaaS environments, this may need to be different from the interface to which the broker binds." +
+  " If this is not set, the value for \"listeners\" will be used."
   val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets"
   val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets"
   val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request"
@@ -342,7 +383,7 @@ object KafkaConfig {
   val LogCleanerDedupeBufferSizeDoc = "The total memory used for log deduplication across all cleaner threads"
   val LogCleanerIoBufferSizeDoc = "The total memory used for log cleaner I/O buffers across all cleaner threads"
   val LogCleanerDedupeBufferLoadFactorDoc = "Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value " +
-    "will allow more log to be cleaned at once but will lead to more hash collisions"
+  "will allow more log to be cleaned at once but will lead to more hash collisions"
   val LogCleanerBackoffMsDoc = "The amount of time to sleep when there are no logs to clean"
   val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total log for a log to eligible for cleaning"
   val LogCleanerEnableDoc = "Should we enable log cleaning?"
@@ -363,15 +404,15 @@ object KafkaConfig {
   val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels"
   val DefaultReplicationFactorDoc = "default replication factors for automatically created topics"
   val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time," +
-    " the leader will remove the follower from isr"
+  " the leader will remove the follower from isr"
   val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms"
   val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests"
   val ReplicaFetchMaxBytesDoc = "The number of byes of messages to attempt to fetch"
   val ReplicaFetchWaitMaxMsDoc = "max wait time for each fetcher request issued by follower replicas. This value should always be less than the " +
-    "replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics"
+  "replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics"
   val ReplicaFetchMinBytesDoc = "Minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs"
   val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate messages from a source broker. " +
-    "Increasing this value can increase the degree of I/O parallelism in the follower broker."
+  "Increasing this value can increase the degree of I/O parallelism in the follower broker."
   val ReplicaFetchBackoffMsDoc = "The amount of time to sleep when fetch partition error occurs."
   val ReplicaHighWatermarkCheckpointIntervalMsDoc = "The frequency with which the high watermark is saved out to disk"
   val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the fetch request purgatory"
@@ -382,8 +423,8 @@ object KafkaConfig {
   val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss"
   val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers. Defaults to plain text."
   val InterBrokerProtocolVersionDoc = "Specify which version of the inter-broker protocol will be used.\n" +
-          " This is typically bumped after all brokers were upgraded to a new version.\n" +
-          " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.8.3, 0.8.3.0. Check ApiVersion for the full list."
+  " This is typically bumped after all brokers were upgraded to a new version.\n" +
+  " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.8.3, 0.8.3.0. Check ApiVersion for the full list."
   /** ********* Controlled shutdown configuration ***********/
   val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens"
   val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying."
@@ -395,17 +436,17 @@ object KafkaConfig {
   val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit"
   val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading offsets into the cache."
   val OffsetsTopicReplicationFactorDoc = "The replication factor for the offsets topic (set higher to ensure availability). " +
-    "To ensure that the effective replication factor of the offsets topic is the configured value, " +
-    "the number of alive brokers has to be at least the replication factor at the time of the " +
-    "first request for the offsets topic. If not, either the offsets topic creation will fail or " +
-    "it will get a replication factor of min(alive brokers, configured replication factor)"
+  "To ensure that the effective replication factor of the offsets topic is the configured value, " +
+  "the number of alive brokers has to be at least the replication factor at the time of the " +
+  "first request for the offsets topic. If not, either the offsets topic creation will fail or " +
+  "it will get a replication factor of min(alive brokers, configured replication factor)"
   val OffsetsTopicPartitionsDoc = "The number of partitions for the offset commit topic (should not change after deployment)"
   val OffsetsTopicSegmentBytesDoc = "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"
   val OffsetsTopicCompressionCodecDoc = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits"
   val OffsetsRetentionMinutesDoc = "Log retention window in minutes for offsets topic"
   val OffsetsRetentionCheckIntervalMsDoc = "Frequency at which to check for stale offsets"
   val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " +
-    "or this timeout is reached. This is similar to the producer request timeout."
+  "or this timeout is reached. This is similar to the producer request timeout."
   val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden"
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second"
@@ -419,14 +460,31 @@ object KafkaConfig {
 
   val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off"
   val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " +
-    "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " +
-    "'producer' which means retain the original compression codec set by the producer."
+  "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " +
+  "'producer' which means retain the original compression codec set by the producer."
 
   /** ********* Kafka Metrics Configuration ***********/
   val MetricSampleWindowMsDoc = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC
   val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC
   val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
 
+  /** ********* SSL Configuration ****************/
+  val PrincipalBuilderClassDoc = SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC
+  val SSLProtocolDoc = SSLConfigs.SSL_PROTOCOL_DOC
+  val SSLProviderDoc = SSLConfigs.SSL_PROVIDER_DOC
+  val SSLCipherSuitesDoc = SSLConfigs.SSL_CIPHER_SUITES_DOC
+  val SSLEnabledProtocolsDoc = SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC
+  val SSLKeystoreTypeDoc = SSLConfigs.SSL_KEYSTORE_TYPE_DOC
+  val SSLKeystoreLocationDoc = SSLConfigs.SSL_KEYSTORE_LOCATION_DOC
+  val SSLKeystorePasswordDoc = SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC
+  val SSLKeyPasswordDoc = SSLConfigs.SSL_KEY_PASSWORD_DOC
+  val SSLTruststoreTypeDoc = SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC
+  val SSLTruststorePasswordDoc = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC
+  val SSLTruststoreLocationDoc = SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC
+  val SSLKeyManagerAlgorithmDoc = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC
+  val SSLTrustManagerAlgorithmDoc = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC
+  val SSLEndpointIdentificationAlgorithmDoc = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
+  val SSLClientAuthDoc = SSLConfigs.SSL_CLIENT_AUTH_DOC
 
   private val configDef = {
     import ConfigDef.Range._
@@ -561,6 +619,24 @@ object KafkaConfig {
       .define(ConsumerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ConsumerQuotaBytesPerSecondOverrides, HIGH, ConsumerQuotaBytesPerSecondOverridesDoc)
       .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc)
       .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
+
+
+      /** ********* SSL Configuration ****************/
+      .define(PrincipalBuilderClassProp, STRING, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc)
+      .define(SSLProtocolProp, STRING, Defaults.SSLProtocol, MEDIUM, SSLProtocolDoc)
+      .define(SSLProviderProp, STRING, MEDIUM, SSLProviderDoc, false)
+      .define(SSLEnabledProtocolsProp, LIST, Defaults.SSLEnabledProtocols, MEDIUM, SSLEnabledProtocolsDoc)
+      .define(SSLKeystoreTypeProp, STRING, Defaults.SSLKeystoreType, MEDIUM, SSLKeystoreTypeDoc)
+      .define(SSLKeystoreLocationProp, STRING, Defaults.SSLKeystoreLocation, MEDIUM, SSLKeystoreLocationDoc)
+      .define(SSLKeystorePasswordProp, STRING, Defaults.SSLKeystorePassword, MEDIUM, SSLKeystorePasswordDoc)
+      .define(SSLKeyPasswordProp, STRING, Defaults.SSLKeyPassword, MEDIUM, SSLKeyPasswordDoc)
+      .define(SSLTruststoreTypeProp, STRING, Defaults.SSLTruststoreType, MEDIUM, SSLTruststoreTypeDoc)
+      .define(SSLTruststoreLocationProp, STRING, Defaults.SSLTruststoreLocation, MEDIUM, SSLTruststoreLocationDoc)
+      .define(SSLTruststorePasswordProp, STRING, Defaults.SSLTruststorePassword, MEDIUM, SSLTruststorePasswordDoc)
+      .define(SSLKeyManagerAlgorithmProp, STRING, Defaults.SSLKeyManagerAlgorithm, MEDIUM, SSLKeyManagerAlgorithmDoc)
+      .define(SSLTrustManagerAlgorithmProp, STRING, Defaults.SSLTrustManagerAlgorithm, MEDIUM, SSLTrustManagerAlgorithmDoc)
+      .define(SSLClientAuthProp, STRING, Defaults.SSLClientAuth, in(Defaults.SSLClientAuthRequired, Defaults.SSLClientAuthRequested, Defaults.SSLClientAuthNone), MEDIUM, SSLClientAuthDoc)
+
   }
 
   def configNames() = {
@@ -569,8 +645,8 @@ object KafkaConfig {
   }
 
   /**
-   * Check that property names are valid
-   */
+    * Check that property names are valid
+    */
   def validateNames(props: Properties) {
     import scala.collection.JavaConversions._
     val names = configDef.names()
@@ -622,7 +698,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
     getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)}
   val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
 
-
   /** ********* Log Configuration ***********/
   val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
   val numPartitions = getInt(KafkaConfig.NumPartitionsProp)
@@ -701,6 +776,22 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
   val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter])
 
+  /** ********* SSL Configuration **************/
+  val principalBuilderClass = getString(KafkaConfig.PrincipalBuilderClassProp)
+  val sslProtocol = getString(KafkaConfig.SSLProtocolProp)
+  val sslProvider = getString(KafkaConfig.SSLProviderProp)
+  val sslEnabledProtocols = getList(KafkaConfig.SSLEnabledProtocolsProp)
+  val sslKeystoreType = getString(KafkaConfig.SSLKeystoreTypeProp)
+  val sslKeystoreLocation = getString(KafkaConfig.SSLKeystoreLocationProp)
+  val sslKeystorePassword = getString(KafkaConfig.SSLKeystorePasswordProp)
+  val sslKeyPassword = getString(KafkaConfig.SSLKeyPasswordProp)
+  val sslTruststoreType = getString(KafkaConfig.SSLTruststoreTypeProp)
+  val sslTruststoreLocation = getString(KafkaConfig.SSLTruststoreLocationProp)
+  val sslTruststorePassword = getString(KafkaConfig.SSLTruststorePasswordProp)
+  val sslKeyManagerAlgorithm = getString(KafkaConfig.SSLKeyManagerAlgorithmProp)
+  val sslTrustManagerAlgorithm = getString(KafkaConfig.SSLTrustManagerAlgorithmProp)
+  val sslClientAuth = getString(KafkaConfig.SSLClientAuthProp)
+
   /** ********* Quota Configuration **************/
   val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
   val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
@@ -726,8 +817,8 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
           case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour
         })
 
-      if (millis < 0) return -1
-      millis
+    if (millis < 0) return -1
+    millis
   }
 
   private def getMap(propName: String, propValue: String): Map[String, String] = {
@@ -746,10 +837,12 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
     } catch {
       case e: Exception => throw new IllegalArgumentException("Error creating broker listeners from '%s': %s".format(listeners, e.getMessage))
     }
-    val distinctPorts = endpoints.map(ep => ep.port).distinct
+    // filter port 0 for unit tests
+    val endpointsWithoutZeroPort = endpoints.map(ep => ep.port).filter(_ != 0)
+    val distinctPorts = endpointsWithoutZeroPort.distinct
     val distinctProtocols = endpoints.map(ep => ep.protocolType).distinct
 
-    require(distinctPorts.size == endpoints.size, "Each listener must have a different port")
+    require(distinctPorts.size == endpointsWithoutZeroPort.size, "Each listener must have a different port")
     require(distinctProtocols.size == endpoints.size, "Each listener must have a different protocol")
   }
 
@@ -796,6 +889,11 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
 
   }
 
+
+  private def getPrincipalBuilderClass(principalBuilderClass: String): PrincipalBuilder = {
+    CoreUtils.createObject[PrincipalBuilder](principalBuilderClass)
+  }
+
   validateValues()
 
   private def validateValues() {
@@ -815,4 +913,24 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
       " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
   }
+
+  def channelConfigs: java.util.Map[String, Object] = {
+    val channelConfigs = new java.util.HashMap[String, Object]()
+    import kafka.server.KafkaConfig._
+    channelConfigs.put(PrincipalBuilderClassProp, Class.forName(principalBuilderClass))
+    channelConfigs.put(SSLProtocolProp, sslProtocol)
+    channelConfigs.put(SSLEnabledProtocolsProp, sslEnabledProtocols)
+    channelConfigs.put(SSLKeystoreTypeProp, sslKeystoreType)
+    channelConfigs.put(SSLKeystoreLocationProp, sslKeystoreLocation)
+    channelConfigs.put(SSLKeystorePasswordProp, sslKeystorePassword)
+    channelConfigs.put(SSLKeyPasswordProp, sslKeyPassword)
+    channelConfigs.put(SSLTruststoreTypeProp, sslTruststoreType)
+    channelConfigs.put(SSLTruststoreLocationProp, sslTruststoreLocation)
+    channelConfigs.put(SSLTruststorePasswordProp, sslTruststorePassword)
+    channelConfigs.put(SSLKeyManagerAlgorithmProp, sslKeyManagerAlgorithm)
+    channelConfigs.put(SSLTrustManagerAlgorithmProp, sslTrustManagerAlgorithm)
+    channelConfigs.put(SSLClientAuthProp, sslClientAuth)
+    channelConfigs
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index a0e3fdf..0e7ba3e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -30,6 +30,7 @@ import java.io.File
 import kafka.utils._
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.network.NetworkReceive
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
 
 import scala.collection.mutable
@@ -84,9 +85,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
   private var shutdownLatch = new CountDownLatch(1)
 
-  private val metricConfig: MetricConfig = new MetricConfig()
-          .samples(config.metricNumSamples)
-          .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS)
   private val jmxPrefix: String = "kafka.server"
   private val reporters: java.util.List[MetricsReporter] =  config.metricReporterClasses
   reporters.add(new JmxReporter(jmxPrefix))
@@ -97,6 +95,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   private val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime()
   var metrics: Metrics = null
 
+  private val metricConfig: MetricConfig = new MetricConfig()
+    .samples(config.metricNumSamples)
+    .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS)
+
   val brokerState: BrokerState = new BrokerState
 
   var apis: KafkaApis = null
@@ -119,7 +121,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   var kafkaHealthcheck: KafkaHealthcheck = null
   val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
 
-
   var zkClient: ZkClient = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
   val brokerMetaPropsFile = "meta.properties"
@@ -166,63 +167,52 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
         config.brokerId =  getBrokerId
         this.logIdent = "[Kafka Server " + config.brokerId + "], "
 
-        socketServer = new SocketServer(config.brokerId,
-                                        config.listeners,
-                                        config.numNetworkThreads,
-                                        config.queuedMaxRequests,
-                                        config.socketSendBufferBytes,
-                                        config.socketReceiveBufferBytes,
-                                        config.socketRequestMaxBytes,
-                                        config.maxConnectionsPerIp,
-                                        config.connectionsMaxIdleMs,
-                                        config.maxConnectionsPerIpOverrides,
-                                        kafkaMetricsTime,
-                                        metrics)
-          socketServer.startup()
-
-          /* start replica manager */
-          replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
-          replicaManager.startup()
-
-          /* start kafka controller */
-          kafkaController = new KafkaController(config, zkClient, brokerState)
-          kafkaController.startup()
-
-          /* start kafka coordinator */
-          consumerCoordinator = ConsumerCoordinator.create(config, zkClient, replicaManager, kafkaScheduler)
-          consumerCoordinator.startup()
-
-          /* start processing requests */
-          apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
-            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics)
-          requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
-          brokerState.newState(RunningAsBroker)
-
-          Mx4jLoader.maybeLoad()
-
-          /* start dynamic config manager */
-          dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
-                                                             ConfigType.Client -> new ClientIdConfigHandler)
-          dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
-          dynamicConfigManager.startup()
-
-          /* tell everyone we are alive */
-          val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
-            if (endpoint.port == 0)
-              (protocol, EndPoint(endpoint.host, socketServer.boundPort(), endpoint.protocolType))
-            else
-              (protocol, endpoint)
-          }
-          kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, config.zkSessionTimeoutMs, zkClient)
-          kafkaHealthcheck.startup()
+        socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
+        socketServer.startup()
+
+        /* start replica manager */
+        replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
+        replicaManager.startup()
+
+        /* start kafka controller */
+        kafkaController = new KafkaController(config, zkClient, brokerState)
+        kafkaController.startup()
+
+        /* start kafka coordinator */
+        consumerCoordinator = ConsumerCoordinator.create(config, zkClient, replicaManager, kafkaScheduler)
+        consumerCoordinator.startup()
+
+        /* start processing requests */
+        apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
+          kafkaController, zkClient, config.brokerId, config, metadataCache, metrics)
+        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
+        brokerState.newState(RunningAsBroker)
+
+        Mx4jLoader.maybeLoad()
+
+        /* start dynamic config manager */
+        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
+                                                           ConfigType.Client -> new ClientIdConfigHandler)
+        dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
+        dynamicConfigManager.startup()
+
+        /* tell everyone we are alive */
+        val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
+          if (endpoint.port == 0)
+            (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
+          else
+            (protocol, endpoint)
+        }
+        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, config.zkSessionTimeoutMs, zkClient)
+        kafkaHealthcheck.startup()
 
-          /* register broker metrics */
-          registerStats()
+        /* register broker metrics */
+        registerStats()
 
-          shutdownLatch = new CountDownLatch(1)
-          startupComplete.set(true)
-          isStartingUp.set(false)
-          info("started")
+        shutdownLatch = new CountDownLatch(1)
+        startupComplete.set(true)
+        isStartingUp.set(false)
+        info("started")
       }
     }
     catch {
@@ -414,7 +404,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
   def getLogManager(): LogManager = logManager
 
-  def boundPort(): Int = socketServer.boundPort()
+  def boundPort(protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Int = socketServer.boundPort(protocol)
 
   private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 4b6358c..637d6f3 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -74,7 +74,7 @@ class ProducerSendTest extends KafkaServerTestHarness {
   def testSendOffset() {
     var producer = TestUtils.createNewProducer(brokerList)
     val partition = new Integer(0)
-    
+
     object callback extends Callback {
       var offset = 0L
       def onCompletion(metadata: RecordMetadata, exception: Exception) {
@@ -298,7 +298,7 @@ class ProducerSendTest extends KafkaServerTestHarness {
       }
     }
   }
-  
+
   /**
    * Test that flush immediately sends all accumulated requests.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
new file mode 100644
index 0000000..4281036
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
@@ -0,0 +1,251 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.util.Properties
+import java.io.File
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.CommitType
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
+import kafka.integration.KafkaServerTestHarness
+
+import kafka.utils.{TestUtils, Logging}
+import kafka.server.KafkaConfig
+
+import java.util.ArrayList
+import org.junit.{Test, Before, After}
+import org.junit.Assert._
+
+import scala.collection.mutable.Buffer
+import scala.collection.JavaConversions._
+import kafka.coordinator.ConsumerCoordinator
+
+
+/**
+  * Integration tests for the new consumer that cover basic usage as well as server failures
+  */
+class SSLConsumerTest extends KafkaServerTestHarness with Logging {
+
+  val trustStoreFile = File.createTempFile("truststore", ".jks")
+  val numServers = 3
+  val producerCount = 1
+  val consumerCount = 2
+  val producerConfig = new Properties
+  val consumerConfig = new Properties
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
+  overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+  overridingProps.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
+  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  overridingProps.put(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout
+
+  val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+  val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+
+  def generateConfigs() =
+    TestUtils.createBrokerConfigs(numServers, zkConnect, false, enableSSL=true, trustStoreFile=Some(trustStoreFile)).map(KafkaConfig.fromProps(_, overridingProps))
+
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+
+  // configure the servers and clients
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
+  this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getSSLBrokerListStrFromServers(servers))
+    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getSSLBrokerListStrFromServers(servers))
+    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+    consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range")
+
+    for (i <- 0 until producerCount)
+      producers += TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers),
+                                               acks = 1,
+                                               enableSSL=true,
+                                               trustStoreFile=Some(trustStoreFile))
+    for (i <- 0 until consumerCount)
+      consumers += TestUtils.createNewConsumer(TestUtils.getSSLBrokerListStrFromServers(servers),
+                                               groupId = "my-test",
+                                               partitionAssignmentStrategy= "range",
+                                               enableSSL=true,
+                                               trustStoreFile=Some(trustStoreFile))
+
+
+    // create the consumer offset topic
+    TestUtils.createTopic(zkClient, ConsumerCoordinator.OffsetsTopicName,
+      overridingProps.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
+      overridingProps.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
+      servers,
+      servers(0).consumerCoordinator.offsetsTopicConfigs)
+
+    // create the test topic with all the brokers as replicas
+    TestUtils.createTopic(this.zkClient, topic, 1, numServers, this.servers)
+  }
+
+  @After
+  override def tearDown() {
+    producers.foreach(_.close())
+    consumers.foreach(_.close())
+    super.tearDown()
+  }
+
+  @Test
+  def testSimpleConsumption() {
+    val numRecords = 10000
+    sendRecords(numRecords)
+    assertEquals(0, this.consumers(0).subscriptions.size)
+    this.consumers(0).subscribe(tp)
+    assertEquals(1, this.consumers(0).subscriptions.size)
+    this.consumers(0).seek(tp, 0)
+    consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
+  }
+
+  @Test
+  def testAutoOffsetReset() {
+    sendRecords(1)
+    this.consumers(0).subscribe(tp)
+    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+  }
+
+  @Test
+  def testSeek() {
+    val consumer = this.consumers(0)
+    val totalRecords = 50L
+    sendRecords(totalRecords.toInt)
+    consumer.subscribe(tp)
+
+    consumer.seekToEnd(tp)
+    assertEquals(totalRecords, consumer.position(tp))
+    assertFalse(consumer.poll(totalRecords).iterator().hasNext)
+
+    consumer.seekToBeginning(tp)
+    assertEquals(0, consumer.position(tp), 0)
+    consumeRecords(consumer, numRecords = 1, startingOffset = 0)
+
+    val mid = totalRecords / 2
+    consumer.seek(tp, mid)
+    assertEquals(mid, consumer.position(tp))
+    consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt)
+  }
+
+  @Test
+  def testGroupConsumption() {
+    sendRecords(10)
+    this.consumers(0).subscribe(topic)
+    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+  }
+
+  @Test
+  def testPositionAndCommit() {
+    sendRecords(5)
+
+    // committed() on a partition with no committed offset throws an exception
+    intercept[NoOffsetForPartitionException] {
+      this.consumers(0).committed(new TopicPartition(topic, 15))
+    }
+
+    // position() on a partition that we aren't subscribed to throws an exception
+    intercept[IllegalArgumentException] {
+      this.consumers(0).position(new TopicPartition(topic, 15))
+    }
+
+    this.consumers(0).subscribe(tp)
+
+    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
+    this.consumers(0).commit(CommitType.SYNC)
+    assertEquals(0L, this.consumers(0).committed(tp))
+
+    consumeRecords(this.consumers(0), 5, 0)
+    assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
+    this.consumers(0).commit(CommitType.SYNC)
+    assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp))
+
+    sendRecords(1)
+
+    // another consumer in the same group should get the same position
+    this.consumers(1).subscribe(tp)
+    consumeRecords(this.consumers(1), 1, 5)
+  }
+
+  @Test
+  def testPartitionsFor() {
+    val numParts = 2
+    TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers)
+    val parts = this.consumers(0).partitionsFor("part-test")
+    assertNotNull(parts)
+    assertEquals(2, parts.length)
+    assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
+  }
+
+  private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
+    var callsToAssigned = 0
+    var callsToRevoked = 0
+    def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+      info("onPartitionsAssigned called.")
+      callsToAssigned += 1
+    }
+    def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+      info("onPartitionsRevoked called.")
+      callsToRevoked += 1
+    }
+  }
+
+  private def sendRecords(numRecords: Int) {
+    val futures = (0 until numRecords).map { i =>
+      this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+    }
+    futures.map(_.get)
+  }
+
+  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int) {
+    val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
+    val maxIters = numRecords * 300
+    var iters = 0
+    while (records.size < numRecords) {
+      for (record <- consumer.poll(50)) {
+        records.add(record)
+      }
+      if (iters > maxIters)
+        throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
+      iters += 1
+    }
+    for (i <- 0 until numRecords) {
+      val record = records.get(i)
+      val offset = startingOffset + i
+      assertEquals(topic, record.topic())
+      assertEquals(part, record.partition())
+      assertEquals(offset.toLong, record.offset())
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
new file mode 100644
index 0000000..0f70624
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
@@ -0,0 +1,240 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+import java.io.File
+
+import kafka.consumer.SimpleConsumer
+import kafka.integration.KafkaServerTestHarness
+import kafka.message.Message
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.errors.SerializationException
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+
+class SSLProducerSendTest extends KafkaServerTestHarness {
+  val numServers = 2
+  val trustStoreFile = File.createTempFile("truststore", ".jks")
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
+
+  def generateConfigs() =
+    TestUtils.createBrokerConfigs(numServers, zkConnect, false, enableSSL=true, trustStoreFile=Some(trustStoreFile)).map(KafkaConfig.fromProps(_, overridingProps))
+
+  private var consumer1: SimpleConsumer = null
+  private var consumer2: SimpleConsumer = null
+
+  private val topic = "topic"
+  private val numRecords = 100
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    // TODO: we need to migrate to new consumers when 0.9 is final
+    consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024*1024, "")
+    consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "")
+
+  }
+
+  @After
+  override def tearDown() {
+    consumer1.close()
+    consumer2.close()
+    super.tearDown()
+  }
+
+  /**
+    * testSendOffset checks the basic send API behavior
+    *
+    * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected.
+    * 2. Last message of the non-blocking send should return the correct offset metadata
+    */
+  @Test
+  def testSendOffset() {
+    var sslProducer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile))
+    var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers))
+    val partition = new Integer(0)
+
+    object callback extends Callback {
+      var offset = 0L
+      def onCompletion(metadata: RecordMetadata, exception: Exception) {
+        if (exception == null) {
+          assertEquals(offset, metadata.offset())
+          assertEquals(topic, metadata.topic())
+          assertEquals(partition, metadata.partition())
+          offset += 1
+        } else {
+          fail("Send callback returns the following exception", exception)
+        }
+      }
+    }
+
+    try {
+      // create topic
+      TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+
+      // send a normal record
+      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes)
+      assertEquals("Should have offset 0", 0L, sslProducer.send(record0, callback).get.offset)
+
+      // send a record with null value should be ok
+      val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, null)
+      assertEquals("Should have offset 1", 1L, sslProducer.send(record1, callback).get.offset)
+
+      // send a record with null key should be ok
+      val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, "value".getBytes)
+      assertEquals("Should have offset 2", 2L, sslProducer.send(record2, callback).get.offset)
+
+      // send a record with null part id should be ok
+      val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
+      assertEquals("Should have offset 3", 3L, sslProducer.send(record3, callback).get.offset)
+
+      // send a record with null topic should fail
+      try {
+        val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, partition, "key".getBytes, "value".getBytes)
+        sslProducer.send(record4, callback)
+        fail("Should not allow sending a record without topic")
+      } catch {
+        case iae: IllegalArgumentException => // this is ok
+        case e: Throwable => fail("Only expecting IllegalArgumentException", e)
+      }
+
+      // non-blocking send a list of records with sslProducer
+      for (i <- 1 to numRecords)
+        sslProducer.send(record0, callback)
+      // check that all messages have been acked via offset
+      assertEquals("Should have offset " + numRecords + 4L, numRecords + 4L, sslProducer.send(record0, callback).get.offset)
+
+      //non-blocking send a list of records with plaintext producer
+      for (i <- 1 to numRecords)
+        producer.send(record0, callback)
+
+      // check that all messages have been acked via offset
+      assertEquals("Should have offset " + (numRecords * 2 + 5L), numRecords * 2 + 5L, producer.send(record0, callback).get.offset)
+
+    } finally {
+      if (sslProducer != null) {
+        sslProducer.close()
+        sslProducer = null
+      }
+      if (producer != null) {
+        producer.close()
+        producer = null
+      }
+
+    }
+  }
+
+  /**
+    * testClose checks the closing behavior
+    *
+    * After close() returns, all messages should be sent with correct returned offset metadata
+    */
+  @Test
+  def testClose() {
+    var producer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile))
+    try {
+      // create topic
+      TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+
+      // non-blocking send a list of records
+      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
+      for (i <- 1 to numRecords)
+        producer.send(record0)
+      val response0 = producer.send(record0)
+
+      // close the producer
+      producer.close()
+      producer = null
+
+      // check that all messages have been acked via offset,
+      // this also checks that messages with same key go to the same partition
+      assertTrue("The last message should be acked before producer is shutdown", response0.isDone)
+      assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset)
+
+    } finally {
+      if (producer != null) {
+        producer.close()
+        producer = null
+      }
+    }
+  }
+
+  /**
+    * testSendToPartition checks the partitioning behavior
+    *
+    * The specified partition-id should be respected
+    */
+  @Test
+  def testSendToPartition() {
+    var producer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile))
+    try {
+      // create topic
+      val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      val partition = 1
+
+      // make sure leaders exist
+      val leader1 = leaders(partition)
+      assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined)
+
+      val responses =
+        for (i <- 1 to numRecords)
+        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes))
+      val futures = responses.toList
+      futures.map(_.get)
+      for (future <- futures)
+        assertTrue("Request should have completed", future.isDone)
+
+      // make sure all of them end up in the same partition with increasing offset values
+      for ((future, offset) <- futures zip (0 until numRecords)) {
+        assertEquals(offset.toLong, future.get.offset)
+        assertEquals(topic, future.get.topic)
+        assertEquals(partition, future.get.partition)
+      }
+
+      // make sure the fetched messages also respect the partitioning and ordering
+      val fetchResponse1 = if (leader1.get == configs(0).brokerId) {
+        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
+      } else {
+        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
+      }
+      val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer
+      assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size)
+
+      // TODO: also check topic and partition after they are added in the return messageSet
+      for (i <- 0 to numRecords - 1) {
+        assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message)
+        assertEquals(i.toLong, messageSet1(i).offset)
+      }
+    } finally {
+      if (producer != null) {
+        producer.close()
+        producer = null
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 05b9a87..ed94039 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -46,7 +46,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     configs = (0 until 4).map(i => KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zkConnect, enableControlledShutdown = false)))
     // start all the servers
     servers = configs.map(c => TestUtils.createServer(c))
-    brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort))
+    brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort()))
 
     // create topics first
     createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 4dba7dc..e4f3576 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -112,9 +112,9 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
   @Test
   def testUncleanLeaderElectionDisabled {
-	// disable unclean leader election
-	configProps1.put("unclean.leader.election.enable", String.valueOf(false))
-	configProps2.put("unclean.leader.election.enable", String.valueOf(false))
+	  // disable unclean leader election
+	  configProps1.put("unclean.leader.election.enable", String.valueOf(false))
+  	configProps2.put("unclean.leader.election.enable", String.valueOf(false))
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index d08b8b8..1937943 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -19,6 +19,8 @@ package kafka.network;
 
 import java.io._
 import java.net._
+import javax.net.ssl._
+import java.io._
 import java.nio.ByteBuffer
 import java.util.Random
 
@@ -33,24 +35,31 @@ import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.SystemTime
 import org.junit.Assert._
 import org.junit._
+import org.scalatest.junit.JUnitSuite
+import java.util.Random
+import kafka.producer.SyncProducerConfig
+import kafka.api.ProducerRequest
+import java.nio.ByteBuffer
+import kafka.common.TopicAndPartition
+import kafka.message.ByteBufferMessageSet
+import kafka.server.KafkaConfig
+import java.nio.channels.SelectionKey
+import kafka.utils.TestUtils
 
 import scala.collection.Map
 
-class SocketServerTest {
-
-  val server: SocketServer = new SocketServer(0,
-                                              Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT),
-                                                  SecurityProtocol.TRACE -> EndPoint(null, 0, SecurityProtocol.TRACE)),
-                                              numProcessorThreads = 1,
-                                              maxQueuedRequests = 50,
-                                              sendBufferSize = 300000,
-                                              recvBufferSize = 300000,
-                                              maxRequestSize = 50,
-                                              maxConnectionsPerIp = 5,
-                                              connectionsMaxIdleMs = 60*1000,
-                                              maxConnectionsPerIpOverrides = Map.empty[String,Int],
-                                              new SystemTime(),
-                                              new Metrics())
+class SocketServerTest extends JUnitSuite {
+  val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+  props.put("listeners", "PLAINTEXT://localhost:0,TRACE://localhost:0")
+  props.put("num.network.threads", "1")
+  props.put("socket.send.buffer.bytes", "300000")
+  props.put("socket.receive.buffer.bytes", "300000")
+  props.put("queued.max.requests", "50")
+  props.put("socket.request.max.bytes", "50")
+  props.put("max.connections.per.ip", "5")
+  props.put("connections.max.idle.ms", "60000")
+  val config: KafkaConfig = KafkaConfig.fromProps(props)
+  val server: SocketServer = new SocketServer(config, new Metrics(), new SystemTime())
   server.startup()
 
   def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
@@ -177,18 +186,8 @@ class SocketServerTest {
   def testMaxConnectionsPerIPOverrides() {
     val overrideNum = 6
     val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
-    val overrideServer: SocketServer = new SocketServer(0,
-                                                Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT)),
-                                                numProcessorThreads = 1,
-                                                maxQueuedRequests = 50,
-                                                sendBufferSize = 300000,
-                                                recvBufferSize = 300000,
-                                                maxRequestSize = 50,
-                                                maxConnectionsPerIp = 5,
-                                                connectionsMaxIdleMs = 60*1000,
-                                                maxConnectionsPerIpOverrides = overrides,
-                                                new SystemTime(),
-                                                new Metrics())
+    val overrideprops = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideprops), new Metrics(), new SystemTime())
     overrideServer.startup()
     // make the maximum allowable number of connections and then leak them
     val conns = ((0 until overrideNum).map(i => connect(overrideServer)))
@@ -198,4 +197,37 @@ class SocketServerTest {
     assertEquals(-1, conn.getInputStream.read())
     overrideServer.shutdown()
   }
+
+  @Test
+  def testSSLSocketServer(): Unit = {
+    val trustStoreFile = File.createTempFile("truststore", ".jks")
+    val overrideprops = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0, enableSSL = true, trustStoreFile = Some(trustStoreFile))
+    overrideprops.put("listeners", "SSL://localhost:0")
+
+    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideprops), new Metrics(), new SystemTime())
+    overrideServer.startup()
+    val sslContext = SSLContext.getInstance("TLSv1.2")
+    sslContext.init(null, Array(TestUtils.trustAllCerts), new java.security.SecureRandom())
+    val socketFactory = sslContext.getSocketFactory
+    val sslSocket = socketFactory.createSocket("localhost", overrideServer.boundPort(SecurityProtocol.SSL)).asInstanceOf[SSLSocket]
+    sslSocket.setNeedClientAuth(false)
+
+    val correlationId = -1
+    val clientId = SyncProducerConfig.DefaultClientId
+    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+    val ack = SyncProducerConfig.DefaultRequiredAcks
+    val emptyRequest =
+      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
+
+    val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
+    emptyRequest.writeTo(byteBuffer)
+    byteBuffer.rewind()
+    val serializedBytes = new Array[Byte](byteBuffer.remaining)
+    byteBuffer.get(serializedBytes)
+
+    sendRequest(sslSocket, 0, serializedBytes)
+    processRequest(overrideServer.requestChannel)
+    assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
+    overrideServer.shutdown()
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 9688b8c..3da666f 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -38,7 +38,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
-  
+
   @Test
   def testLogRetentionTimeMinutesProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
@@ -47,7 +47,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
-  
+
   @Test
   def testLogRetentionTimeMsProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
@@ -56,7 +56,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
-  
+
   @Test
   def testLogRetentionTimeNoConfigProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
@@ -64,7 +64,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
-  
+
   @Test
   def testLogRetentionTimeBothMinutesAndHoursProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
@@ -74,7 +74,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
-  
+
   @Test
   def testLogRetentionTimeBothMinutesAndMsProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
@@ -166,11 +166,11 @@ class KafkaConfigTest {
     val serverConfig = KafkaConfig.fromProps(props)
     val endpoints = serverConfig.advertisedListeners
     val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
-    
+
     assertEquals(endpoint.host, advertisedHostName)
     assertEquals(endpoint.port, advertisedPort.toInt)
   }
-    
+
   @Test
   def testAdvertisePortDefault() {
     val advertisedHostName = "routable-host"
@@ -187,7 +187,7 @@ class KafkaConfigTest {
     assertEquals(endpoint.host, advertisedHostName)
     assertEquals(endpoint.port, port.toInt)
   }
-  
+
   @Test
   def testAdvertiseHostNameDefault() {
     val hostName = "routable-host"
@@ -203,8 +203,8 @@ class KafkaConfigTest {
 
     assertEquals(endpoint.host, hostName)
     assertEquals(endpoint.port, advertisedPort.toInt)
-  }    
-    
+  }
+
   @Test
   def testDuplicateListeners() {
     val props = new Properties()
@@ -328,7 +328,7 @@ class KafkaConfigTest {
       KafkaConfig.fromProps(props)
     }
   }
-  
+
   @Test
   def testLogRollTimeMsProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
@@ -337,7 +337,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis)
   }
-  
+
   @Test
   def testLogRollTimeBothMsAndHoursProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
@@ -347,7 +347,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis)
   }
-    
+
   @Test
   def testLogRollTimeNoConfigProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
@@ -487,6 +487,22 @@ class KafkaConfigTest {
         case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
         case KafkaConfig.MetricReporterClassesProp => // ignore string
 
+        //SSL Configs
+        case KafkaConfig.PrincipalBuilderClassProp =>
+        case KafkaConfig.SSLProtocolProp => // ignore string
+        case KafkaConfig.SSLProviderProp => // ignore string
+        case KafkaConfig.SSLEnabledProtocolsProp =>
+        case KafkaConfig.SSLKeystoreTypeProp => // ignore string
+        case KafkaConfig.SSLKeystoreLocationProp => // ignore string
+        case KafkaConfig.SSLKeystorePasswordProp => // ignore string
+        case KafkaConfig.SSLKeyPasswordProp => // ignore string
+        case KafkaConfig.SSLTruststoreTypeProp => // ignore string
+        case KafkaConfig.SSLTruststorePasswordProp => // ignore string
+        case KafkaConfig.SSLTruststoreLocationProp => // ignore string
+        case KafkaConfig.SSLKeyManagerAlgorithmProp =>
+        case KafkaConfig.SSLTrustManagerAlgorithmProp =>
+        case KafkaConfig.SSLClientAuthProp => // ignore string
+
         case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
       }
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f00f00a..00fbb61 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -22,6 +22,8 @@ import java.nio._
 import java.nio.channels._
 import java.util.Random
 import java.util.Properties
+import java.security.cert.X509Certificate
+import javax.net.ssl.X509TrustManager
 import charset.Charset
 
 import org.apache.kafka.common.protocol.SecurityProtocol
@@ -45,9 +47,16 @@ import kafka.log._
 
 import org.junit.Assert._
 import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.security.ssl.SSLFactory
+import org.apache.kafka.common.config.SSLConfigs
+import org.apache.kafka.test.TestSSLUtils
 
 import scala.collection.Map
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import scala.collection.JavaConversions._
 
 /**
  * Utility functions to help with testing
@@ -132,24 +141,33 @@ object TestUtils extends Logging {
   def createBrokerConfigs(numConfigs: Int,
     zkConnect: String,
     enableControlledShutdown: Boolean = true,
-    enableDeleteTopic: Boolean = false): Seq[Properties] = {
-    (0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic))
+    enableDeleteTopic: Boolean = false,
+    enableSSL: Boolean = false,
+    trustStoreFile: Option[File] = None): Seq[Properties] = {
+    (0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, enableSSL = enableSSL, trustStoreFile = trustStoreFile))
   }
 
   def getBrokerListStrFromServers(servers: Seq[KafkaServer]): String = {
     servers.map(s => formatAddress(s.config.hostName, s.boundPort())).mkString(",")
   }
 
+  def getSSLBrokerListStrFromServers(servers: Seq[KafkaServer]): String = {
+    servers.map(s => formatAddress(s.config.hostName, s.boundPort(SecurityProtocol.SSL))).mkString(",")
+  }
+
   /**
    * Create a test config for the given node id
    */
   def createBrokerConfig(nodeId: Int, zkConnect: String,
     enableControlledShutdown: Boolean = true,
     enableDeleteTopic: Boolean = false,
-    port: Int = RandomPort): Properties = {
+    port: Int = RandomPort, enableSSL: Boolean = false, sslPort: Int = RandomPort, trustStoreFile: Option[File] = None): Properties = {
     val props = new Properties
+    var listeners: String = "PLAINTEXT://localhost:"+port.toString
     if (nodeId >= 0) props.put("broker.id", nodeId.toString)
-    props.put("listeners", "PLAINTEXT://localhost:"+port.toString)
+    if (enableSSL)
+      listeners = listeners + "," + "SSL://localhost:"+sslPort.toString
+    props.put("listeners", listeners)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("zookeeper.connect", zkConnect)
     props.put("replica.socket.timeout.ms", "1500")
@@ -157,6 +175,9 @@ object TestUtils extends Logging {
     props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
     props.put("delete.topic.enable", enableDeleteTopic.toString)
     props.put("controlled.shutdown.retry.backoff.ms", "100")
+    if (enableSSL) {
+      props.putAll(addSSLConfigs(SSLFactory.Mode.SERVER, true, trustStoreFile, "server"+nodeId))
+    }
     props.put("port", port.toString)
     props
   }
@@ -381,7 +402,9 @@ object TestUtils extends Logging {
                         blockOnBufferFull: Boolean = true,
                         bufferSize: Long = 1024L * 1024L,
                         retries: Int = 0,
-                        lingerMs: Long = 0) : KafkaProducer[Array[Byte],Array[Byte]] = {
+                        lingerMs: Long = 0,
+                        enableSSL: Boolean = false,
+                        trustStoreFile: Option[File] = None) : KafkaProducer[Array[Byte],Array[Byte]] = {
     import org.apache.kafka.clients.producer.ProducerConfig
 
     val producerProps = new Properties()
@@ -396,6 +419,10 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString)
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    if (enableSSL) {
+      producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
+      producerProps.putAll(addSSLConfigs(SSLFactory.Mode.CLIENT, false, trustStoreFile, "producer"))
+    }
     new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
   }
 
@@ -405,7 +432,12 @@ object TestUtils extends Logging {
   def createNewConsumer(brokerList: String,
                         groupId: String,
                         autoOffsetReset: String = "earliest",
-                        partitionFetchSize: Long = 4096L) : KafkaConsumer[Array[Byte],Array[Byte]] = {
+                        partitionFetchSize: Long = 4096L,
+                        partitionAssignmentStrategy: String = "blah",
+                        sessionTimeout: Int = 30000,
+                        callback: Option[ConsumerRebalanceCallback] = None,
+                        enableSSL: Boolean = false,
+                        trustStoreFile: Option[File] = None) : KafkaConsumer[Array[Byte],Array[Byte]] = {
     import org.apache.kafka.clients.consumer.ConsumerConfig
 
     val consumerProps= new Properties()
@@ -417,7 +449,17 @@ object TestUtils extends Logging {
     consumerProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
-    new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
+    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partitionAssignmentStrategy)
+    consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toString)
+    if (enableSSL) {
+      consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
+      consumerProps.putAll(addSSLConfigs(SSLFactory.Mode.CLIENT, false, trustStoreFile, "consumer"))
+    }
+    if (callback.isDefined) {
+      new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps, callback.get, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    } else {
+      new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
+    }
   }
 
   /**
@@ -873,6 +915,37 @@ object TestUtils extends Logging {
     new String(bytes, encoding)
   }
 
+  def addSSLConfigs(mode: SSLFactory.Mode, clientCert: Boolean, trustStoreFile: Option[File],  certAlias: String): Properties = {
+    var sslConfigs: java.util.Map[String, Object] = new java.util.HashMap[String, Object]()
+    if (!trustStoreFile.isDefined) {
+      throw new Exception("enableSSL set to true but no trustStoreFile provided")
+    }
+    if (mode == SSLFactory.Mode.SERVER)
+      sslConfigs = TestSSLUtils.createSSLConfig(true, true, mode, trustStoreFile.get, certAlias)
+    else
+      sslConfigs = TestSSLUtils.createSSLConfig(clientCert, false, mode, trustStoreFile.get, certAlias)
+
+    val sslProps = new Properties()
+    sslConfigs.foreach(kv =>
+      sslProps.put(kv._1, kv._2)
+    )
+    sslProps
+  }
+
+  // a X509TrustManager to trust self-signed certs for unit tests.
+  def trustAllCerts: X509TrustManager = {
+    val trustManager = new X509TrustManager() {
+      override def getAcceptedIssuers: Array[X509Certificate] = {
+        null
+      }
+      override def checkClientTrusted(certs: Array[X509Certificate], authType: String) {
+      }
+      override def checkServerTrusted(certs: Array[X509Certificate], authType: String) {
+      }
+    }
+    trustManager
+  }
+
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {


[4/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao

Posted by ju...@apache.org.
kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e2c683f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e2c683f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e2c683f

Branch: refs/heads/trunk
Commit: 9e2c683f550b7ae58008d0bcb62238b7a2d89a65
Parents: 503bd36
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Tue Aug 18 21:51:15 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Aug 18 21:51:15 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   6 +-
 checkstyle/import-control.xml                   |  52 +-
 .../org/apache/kafka/clients/ClientUtils.java   |  37 +-
 .../kafka/clients/CommonClientConfigs.java      |  18 +-
 .../org/apache/kafka/clients/NetworkClient.java |  41 +-
 .../kafka/clients/consumer/ConsumerConfig.java  |  26 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |  92 +--
 .../kafka/clients/producer/KafkaProducer.java   |   7 +-
 .../kafka/clients/producer/ProducerConfig.java  |  18 +
 .../kafka/common/config/AbstractConfig.java     |  10 +-
 .../apache/kafka/common/config/SSLConfigs.java  | 102 +++
 .../kafka/common/network/Authenticator.java     |  62 ++
 .../kafka/common/network/ByteBufferSend.java    |  13 +-
 .../kafka/common/network/ChannelBuilder.java    |  44 ++
 .../common/network/DefaultAuthenticator.java    |  63 ++
 .../kafka/common/network/KafkaChannel.java      | 166 +++++
 .../kafka/common/network/NetworkReceive.java    |   5 +-
 .../common/network/PlaintextChannelBuilder.java |  58 ++
 .../common/network/PlaintextTransportLayer.java | 217 ++++++
 .../kafka/common/network/SSLChannelBuilder.java |  68 ++
 .../kafka/common/network/SSLTransportLayer.java | 690 +++++++++++++++++++
 .../apache/kafka/common/network/Selectable.java |  12 +-
 .../apache/kafka/common/network/Selector.java   | 316 +++++----
 .../org/apache/kafka/common/network/Send.java   |   6 +-
 .../kafka/common/network/TransportLayer.java    |  86 +++
 .../kafka/common/protocol/SecurityProtocol.java |   2 +
 .../security/auth/DefaultPrincipalBuilder.java  |  43 ++
 .../common/security/auth/KafkaPrincipal.java    |  58 ++
 .../common/security/auth/PrincipalBuilder.java  |  51 ++
 .../kafka/common/security/ssl/SSLFactory.java   | 210 ++++++
 .../org/apache/kafka/common/utils/Utils.java    |  53 +-
 .../apache/kafka/clients/ClientUtilsTest.java   |   2 +-
 .../clients/producer/KafkaProducerTest.java     |  19 +-
 .../apache/kafka/common/network/EchoServer.java | 119 ++++
 .../kafka/common/network/SSLSelectorTest.java   | 276 ++++++++
 .../kafka/common/network/SelectorTest.java      | 110 ++-
 .../common/security/ssl/SSLFactoryTest.java     |  60 ++
 .../apache/kafka/common/utils/UtilsTest.java    |   4 +-
 .../org/apache/kafka/test/MockSelector.java     |   8 +-
 .../org/apache/kafka/test/TestSSLUtils.java     | 243 +++++++
 .../main/scala/kafka/api/FetchResponse.scala    |  24 +-
 .../main/scala/kafka/network/SocketServer.scala | 175 +++--
 .../main/scala/kafka/server/KafkaConfig.scala   | 218 ++++--
 .../main/scala/kafka/server/KafkaServer.scala   | 110 ++-
 .../kafka/api/ProducerSendTest.scala            |   4 +-
 .../integration/kafka/api/SSLConsumerTest.scala | 251 +++++++
 .../kafka/api/SSLProducerSendTest.scala         | 240 +++++++
 .../unit/kafka/admin/AddPartitionsTest.scala    |   2 +-
 .../integration/UncleanLeaderElectionTest.scala |   6 +-
 .../unit/kafka/network/SocketServerTest.scala   |  86 ++-
 .../unit/kafka/server/KafkaConfigTest.scala     |  42 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  89 ++-
 52 files changed, 4121 insertions(+), 599 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 983587f..17fc223 100644
--- a/build.gradle
+++ b/build.gradle
@@ -256,9 +256,10 @@ project(':core') {
     testCompile "$junit"
     testCompile "$easymock"
     testCompile 'org.objenesis:objenesis:1.2'
+    testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52'
     testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
-    testCompile project(path: ':clients', configuration: 'archives')
-
+    testCompile project(':clients')
+    testCompile project(':clients').sourceSets.test.output
     testRuntime "$slf4jlog4j"
 
     zinc 'com.typesafe.zinc:zinc:0.3.7'
@@ -390,6 +391,7 @@ project(':clients') {
     compile 'org.xerial.snappy:snappy-java:1.1.1.7'
     compile 'net.jpountz.lz4:lz4:1.2.0'
 
+    testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52'
     testCompile "$junit"
     testRuntime "$slf4jlog4j"
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e3f4f84..eb682f4 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -1,6 +1,6 @@
 <!DOCTYPE import-control PUBLIC
-    "-//Puppy Crawl//DTD Import Control 1.1//EN"
-    "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+"-//Puppy Crawl//DTD Import Control 1.1//EN"
+"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
 <!--
 // Licensed to the Apache Software Foundation (ASF) under one or more
 // contributor license agreements.  See the NOTICE file distributed with
@@ -8,68 +8,79 @@
 // The ASF licenses this file to You under the Apache License, Version 2.0
 // (the "License"); you may not use this file except in compliance with
 // the License.  You may obtain a copy of the License at
-// 
+//
 //    http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
---> 
+-->
+
 <import-control pkg="org.apache.kafka">
-	
+
 	<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
-	
+
 	<!-- common library dependencies -->
 	<allow pkg="java" />
 	<allow pkg="javax.management" />
 	<allow pkg="org.slf4j" />
 	<allow pkg="org.junit" />
-	
+	<allow pkg="javax.net.ssl" />
+
 	<!-- no one depends on the server -->
 	<disallow pkg="kafka" />
-	
+
 	<!-- anyone can use public classes -->
 	<allow pkg="org.apache.kafka.common" exact-match="true" />
 	<allow pkg="org.apache.kafka.common.utils" />
-	
+
 	<subpackage name="common">
 		<disallow pkg="org.apache.kafka.clients" />
 		<allow pkg="org.apache.kafka.common" exact-match="true" />
 		<allow pkg="org.apache.kafka.test" />
-	
+
 		<subpackage name="config">
 			<allow pkg="org.apache.kafka.common.config" />
 			<!-- for testing -->
 			<allow pkg="org.apache.kafka.common.metrics" />
 		</subpackage>
-	
+
 		<subpackage name="metrics">
 			<allow pkg="org.apache.kafka.common.metrics" />
 		</subpackage>
-	
+
 		<subpackage name="network">
+			<allow pkg="org.apache.kafka.common.security.auth" />
+			<allow pkg="org.apache.kafka.common.protocol" />
+			<allow pkg="org.apache.kafka.common.config" />
 			<allow pkg="org.apache.kafka.common.metrics" />
+      <allow pkg="org.apache.kafka.common.security" />
 		</subpackage>
-	
+
+		<subpackage name="security">
+      <allow pkg="org.apache.kafka.common.network" />
+      <allow pkg="org.apache.kafka.common.config" />
+    </subpackage>
+
 		<subpackage name="protocol">
 			<allow pkg="org.apache.kafka.common.errors" />
 			<allow pkg="org.apache.kafka.common.protocol.types" />
 		</subpackage>
-	
+
 		<subpackage name="record">
 			<allow pkg="net.jpountz" />
 			<allow pkg="org.apache.kafka.common.record" />
 		</subpackage>
-	
+
 		<subpackage name="requests">
 			<allow pkg="org.apache.kafka.common.protocol" />
 			<allow pkg="org.apache.kafka.common.network" />
 			<!-- for testing -->
 			<allow pkg="org.apache.kafka.common.errors" />
 		</subpackage>
-	
+
 		<subpackage name="serialization">
 			<allow class="org.apache.kafka.common.errors.SerializationException" />
 		</subpackage>
@@ -80,15 +91,15 @@
 		<allow pkg="org.slf4j" />
 		<allow pkg="org.apache.kafka.clients" exact-match="true"/>
 		<allow pkg="org.apache.kafka.test" />
-	
+
 		<subpackage name="consumer">
 			<allow pkg="org.apache.kafka.clients.consumer" />
 		</subpackage>
-	
+
 		<subpackage name="producer">
 			<allow pkg="org.apache.kafka.clients.producer" />
 		</subpackage>
-	
+
 		<subpackage name="tools">
 			<allow pkg="org.apache.kafka.clients.producer" />
 			<allow pkg="org.apache.kafka.clients.consumer" />
@@ -106,6 +117,7 @@
 
 	<subpackage name="test">
 		<allow pkg="org.apache.kafka" />
+		<allow pkg="org.bouncycastle" />
 	</subpackage>
 
 	<subpackage name="copycat">

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 0d68bf1..ba3bcbe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -16,8 +16,14 @@ import java.io.Closeable;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.SSLChannelBuilder;
+import org.apache.kafka.common.network.PlaintextChannelBuilder;
+import org.apache.kafka.common.security.ssl.SSLFactory;
 import org.apache.kafka.common.config.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 
+
 public class ClientUtils {
     private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
 
@@ -61,4 +68,28 @@ public class ClientUtils {
             }
         }
     }
-}
\ No newline at end of file
+
+    /**
+     * @param configs client/server configs
+     * returns ChannelBuilder configured channelBuilder based on the configs.
+     */
+    public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
+        SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+        ChannelBuilder channelBuilder = null;
+
+        switch (securityProtocol) {
+            case SSL:
+                channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT);
+                break;
+            case PLAINTEXT:
+                channelBuilder = new PlaintextChannelBuilder();
+                break;
+            default:
+                throw new ConfigException("Invalid SecurityProtocol " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+        }
+
+        channelBuilder.configure(configs);
+        return channelBuilder;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 2c421f4..7d24c6f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -17,7 +17,7 @@ package org.apache.kafka.clients;
  * Some configurations shared by both producer and consumer
  */
 public class CommonClientConfigs {
-    
+
     /*
      * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
      */
@@ -27,10 +27,10 @@ public class CommonClientConfigs {
                                                        + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to "
                                                        + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
                                                        + "servers (you may want more than one, though, in case a server is down).";
-    
+
     public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
     public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
-    
+
     public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
     public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data.";
 
@@ -45,7 +45,7 @@ public class CommonClientConfigs {
 
     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
     public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.";
-    
+
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
     public static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The number of samples maintained to compute metrics.";
 
@@ -55,6 +55,10 @@ public class CommonClientConfigs {
     public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
     public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
 
+    public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
+    public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.";
+    public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
+
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
     public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config.";
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0e51d7b..b31f7f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -101,7 +101,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Begin connecting to the given node, return true if we are already connected and ready to send to that node.
-     * 
+     *
      * @param node The node to check
      * @param now The current timestamp
      * @return True if we are ready to send to the given node
@@ -122,7 +122,7 @@ public class NetworkClient implements KafkaClient {
      * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
      * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
      * connections.
-     * 
+     *
      * @param node The node to check
      * @param now The current timestamp
      * @return The number of milliseconds to wait.
@@ -147,7 +147,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Check if the node with the given id is ready to send more requests.
-     * 
+     *
      * @param node The node
      * @param now The current time in ms
      * @return true if the node is ready
@@ -161,21 +161,21 @@ public class NetworkClient implements KafkaClient {
             return false;
         else
             // otherwise we are ready if we are connected and can send more requests
-            return isSendable(nodeId);
+            return canSendRequest(nodeId);
     }
 
     /**
      * Are we connected and ready and able to send more requests to the given connection?
-     * 
+     *
      * @param node The node
      */
-    private boolean isSendable(String node) {
-        return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node);
+    private boolean canSendRequest(String node) {
+        return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
     }
 
     /**
      * Return the state of the connection to the given node
-     * 
+     *
      * @param node The node to check
      * @return The connection state
      */
@@ -185,13 +185,13 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Queue up the given request for sending. Requests can only be sent out to ready nodes.
-     * 
+     *
      * @param request The request
      */
     @Override
     public void send(ClientRequest request) {
         String nodeId = request.request().destination();
-        if (!isSendable(nodeId))
+        if (!canSendRequest(nodeId))
             throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
 
         this.inFlightRequests.add(request);
@@ -200,7 +200,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Do actual reads and writes to sockets.
-     * 
+     *
      * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately
      * @param now The current time in milliseconds
      * @return The list of responses received
@@ -246,7 +246,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Await all the outstanding responses for requests on the given connection
-     * 
+     *
      * @param node The node to block on
      * @param now The current time in ms
      * @return All the collected responses
@@ -294,7 +294,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Generate a request header for the given API key
-     * 
+     *
      * @param key The api key
      * @return A request header with the appropriate client id and correlation id
      */
@@ -324,7 +324,7 @@ public class NetworkClient implements KafkaClient {
      * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
      * connection if all existing connections are in use. This method will never choose a node for which there is no
      * existing connection and from which we have disconnected within the reconnect backoff period.
-     * 
+     *
      * @return The node with the fewest in-flight requests.
      */
     public Node leastLoadedNode(long now) {
@@ -349,7 +349,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Handle any completed request send. In particular if no response is expected consider the request complete.
-     * 
+     *
      * @param responses The list of responses to update
      * @param now The current time
      */
@@ -366,7 +366,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Handle any completed receives and update the response list with the responses received.
-     * 
+     *
      * @param responses The list of responses to update
      * @param now The current time
      */
@@ -407,7 +407,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Handle any disconnected connections
-     * 
+     *
      * @param responses The list of responses that completed with the disconnection
      * @param now The current time
      */
@@ -472,8 +472,7 @@ public class NetworkClient implements KafkaClient {
         }
         String nodeConnectionId = node.idString();
 
-
-        if (connectionStates.isConnected(nodeConnectionId) && inFlightRequests.canSendMore(nodeConnectionId)) {
+        if (canSendRequest(nodeConnectionId)) {
             Set<String> topics = metadata.topics();
             this.metadataFetchInProgress = true;
             ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index d35b421..9c9510a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -19,6 +19,7 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.config.SSLConfigs;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -153,7 +154,7 @@ public class ConsumerConfig extends AbstractConfig {
      */
     public static final String CHECK_CRCS_CONFIG = "check.crcs";
     private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";
-    
+
     /** <code>key.deserializer</code> */
     public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
     private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
@@ -267,7 +268,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.BOOLEAN,
                                         true,
                                         Importance.LOW,
-                                        CHECK_CRCS_DOC)                                
+                                        CHECK_CRCS_DOC)
                                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
                                         Type.LONG,
                                         30000,
@@ -293,12 +294,29 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         Importance.HIGH,
                                         VALUE_DESERIALIZER_CLASS_DOC)
+                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                                .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                                .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
+                                .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
+                                .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
+                                .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                                .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
+                                .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                                .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                                .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_LOCATION, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_PASSWORD, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
+                                .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                                .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                                .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
                                         Type.LONG,
                                         9 * 60 * 1000,
                                         Importance.MEDIUM,
                                         CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC);
+
     }
 
     public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index be46b6c..3749880 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -31,8 +31,9 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -65,7 +66,6 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to.
  * Failure to close the consumer after use will leak these connections.
- * <p>
  * The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details.
  *
  * <h3>Offsets and Consumer Position</h3>
@@ -85,9 +85,9 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * This distinction gives the consumer control over when a record is considered consumed. It is discussed in further
  * detail below.
- * 
+ *
  * <h3>Consumer Groups</h3>
- * 
+ *
  * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and
  * processing records. These processes can either be running on the same machine or, as is more likely, they can be
  * distributed over many machines to provide additional scalability and fault tolerance for processing.
@@ -116,14 +116,14 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic
  * partition balancing.
- * 
+ *
  * <h3>Usage Examples</h3>
  * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
  * demonstrate how to use them.
- * 
+ *
  * <h4>Simple Processing</h4>
  * This example demonstrates the simplest usage of Kafka's consumer api.
- * 
+ *
  * <pre>
  *     Properties props = new Properties();
  *     props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
@@ -141,7 +141,7 @@ import static org.apache.kafka.common.utils.Utils.min;
  *             System.out.printf(&quot;offset = %d, key = %s, value = %s&quot;, record.offset(), record.key(), record.value());
  *     }
  * </pre>
- * 
+ *
  * Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by
  * the config <code>auto.commit.interval.ms</code>.
  * <p>
@@ -161,9 +161,9 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
  * are saying that our record's key and value will just be simple strings.
- * 
+ *
  * <h4>Controlling When Messages Are Considered Consumed</h4>
- * 
+ *
  * In this example we will consume a batch of records and batch them up in memory, when we have sufficient records
  * batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages
  * would be considered consumed after they were given out by the consumer, and it would be possible that our process
@@ -175,7 +175,7 @@ import static org.apache.kafka.common.utils.Utils.min;
  * would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way
  * Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one
  * time but in failure cases could be duplicated.
- * 
+ *
  * <pre>
  *     Properties props = new Properties();
  *     props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
@@ -201,9 +201,9 @@ import static org.apache.kafka.common.utils.Utils.min;
  *         }
  *     }
  * </pre>
- * 
+ *
  * <h4>Subscribing To Specific Partitions</h4>
- * 
+ *
  * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process
  * a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple
  * instances of our program can divided up the work of processing records.
@@ -223,7 +223,7 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular
  * partitions:
- * 
+ *
  * <pre>
  *     String topic = &quot;foo&quot;;
  *     TopicPartition partition0 = new TopicPartition(topic, 0);
@@ -231,15 +231,15 @@ import static org.apache.kafka.common.utils.Utils.min;
  *     consumer.subscribe(partition0);
  *     consumer.subscribe(partition1);
  * </pre>
- * 
+ *
  * The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only
  * be changed if the consumer specifies new partitions, and no attempt at failure detection will be made.
  * <p>
  * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load
  * balancing) using the same consumer instance.
- * 
+ *
  * <h4>Managing Your Own Offsets</h4>
- * 
+ *
  * The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of it's own
  * choosing. The primary use case for this is allowing the application to store both the offset and the results of the
  * consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
@@ -259,14 +259,14 @@ import static org.apache.kafka.common.utils.Utils.min;
  * This means that in this case the indexing process that comes back having lost recent updates just resumes indexing
  * from what it has ensuring that no updates are lost.
  * </ul>
- * 
+ *
  * Each record comes with it's own offset, so to manage your own offset you just need to do the following:
  * <ol>
  * <li>Configure <code>enable.auto.commit=false</code>
  * <li>Use the offset provided with each {@link ConsumerRecord} to save your position.
  * <li>On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}.
  * </ol>
- * 
+ *
  * This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
  * search index use case described above). If the partition assignment is done automatically special care will also be
  * needed to handle the case where partition assignments change. This can be handled using a special callback specified
@@ -279,9 +279,9 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <p>
  * Another common use for {@link ConsumerRebalanceCallback} is to flush any caches the application maintains for
  * partitions that are moved elsewhere.
- * 
+ *
  * <h4>Controlling The Consumer's Position</h4>
- * 
+ *
  * In most use cases the consumer will simply consume records from beginning to end, periodically committing it's
  * position (either automatically or manually). However Kafka allows the consumer to manually control it's position,
  * moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to
@@ -296,14 +296,14 @@ import static org.apache.kafka.common.utils.Utils.min;
  * the consumer will want to initialize it's position on start-up to whatever is contained in the local store. Likewise
  * if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by
  * reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
- * 
+ *
  * Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special
  * methods for seeking to the earliest and latest offset the server maintains are also available (
  * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively).
- * 
+ *
  *
  * <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
- * 
+ *
  * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
  * making the call. It is the responsibility of the user to ensure that multi-threaded access
  * is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
@@ -353,9 +353,9 @@ import static org.apache.kafka.common.utils.Utils.min;
  * We have intentionally avoided implementing a particular threading model for processing. This leaves several
  * options for implementing multi-threaded processing of records.
  *
- * 
+ *
  * <h4>1. One Consumer Per Thread</h4>
- * 
+ *
  * A simple option is to give each thread it's own consumer instance. Here are the pros and cons of this approach:
  * <ul>
  * <li><b>PRO</b>: It is the easiest to implement
@@ -368,13 +368,13 @@ import static org.apache.kafka.common.utils.Utils.min;
  * which can cause some drop in I/O throughput.
  * <li><b>CON</b>: The number of total threads across all processes will be limited by the total number of partitions.
  * </ul>
- * 
+ *
  * <h4>2. Decouple Consumption and Processing</h4>
- * 
+ *
  * Another alternative is to have one or more consumer threads that do all data consumption and hands off
  * {@link ConsumerRecords} instances to a blocking queue consumed by a pool of processor threads that actually handle
  * the record processing.
- * 
+ *
  * This option likewise has pros and cons:
  * <ul>
  * <li><b>PRO</b>: This option allows independently scaling the number of consumers and processors. This makes it
@@ -385,11 +385,11 @@ import static org.apache.kafka.common.utils.Utils.min;
  * <li><b>CON</b>: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure
  * that processing is complete for that partition.
  * </ul>
- * 
+ *
  * There are many possible variations on this approach. For example each processor thread can have it's own queue, and
  * the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify
  * commit.
- * 
+ *
  */
 @InterfaceStability.Unstable
 public class KafkaConsumer<K, V> implements Consumer<K, V> {
@@ -430,7 +430,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * string "42" or the integer 42).
      * <p>
      * Valid configuration strings are documented at {@link ConsumerConfig}
-     * 
+     *
      * @param configs The consumer configs
      */
     public KafkaConsumer(Map<String, Object> configs) {
@@ -442,7 +442,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
      * <p>
      * Valid configuration strings are documented at {@link ConsumerConfig}
-     * 
+     *
      * @param configs The consumer configs
      * @param callback A callback interface that the user can implement to manage customized offsets on the start and
      *            end of every rebalance operation.
@@ -476,7 +476,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
      * <p>
      * Valid configuration strings are documented at {@link ConsumerConfig}
-     * 
+     *
      * @param properties The consumer configuration properties
      * @param callback A callback interface that the user can implement to manage customized offsets on the start and
      *            end of every rebalance operation.
@@ -524,12 +524,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), 0);
-
             String metricGrpPrefix = "consumer";
             Map<String, String> metricsTags = new LinkedHashMap<String, String>();
             metricsTags.put("client-id", clientId);
+            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
             NetworkClient netClient = new NetworkClient(
-                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags),
+                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder),
                     this.metadata,
                     clientId,
                     100, // a fixed large enough value will suffice
@@ -623,7 +623,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * <li>An existing member of the consumer group dies
      * <li>A new member is added to an existing consumer group via the join API
      * </ul>
-     * 
+     *
      * @param topics A variable list of topics that the consumer wants to subscribe to
      */
     @Override
@@ -664,7 +664,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Unsubscribe from the specific topics. This will trigger a rebalance operation and records for this topic will not
      * be returned from the next {@link #poll(long) poll()} onwards
-     * 
+     *
      * @param topics Topics to unsubscribe from
      */
     public void unsubscribe(String... topics) {
@@ -682,7 +682,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Unsubscribe from the specific topic partitions. records for these partitions will not be returned from the next
      * {@link #poll(long) poll()} onwards
-     * 
+     *
      * @param partitions Partitions to unsubscribe from
      */
     public void unsubscribe(TopicPartition... partitions) {
@@ -705,11 +705,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every
      * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
      * offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions.
-     * 
+     *
      * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
      *            immediately with any records available now. Must not be negative.
      * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
-     * 
+     *
      * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic
      *             offset reset policy has been configured.
      */
@@ -932,7 +932,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Returns the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
-     * 
+     *
      * @param partition The partition to get the position for
      * @return The offset
      * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
@@ -961,7 +961,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * <p>
      * This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the
      * consumer hasn't yet initialized it's cache of committed offsets.
-     * 
+     *
      * @param partition The partition to check
      * @return The last committed offset
      * @throws NoOffsetForPartitionException If no offset has ever been committed by any process for the given
@@ -1003,7 +1003,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
      * does not already have any metadata about the given topic.
-     * 
+     *
      * @param topic The topic to get partition metadata for
      * @return The list of partitions
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 03b8dd2..c4621e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
@@ -226,9 +227,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     metricTags);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
-
+            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
             NetworkClient client = new NetworkClient(
-                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags),
+                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),
                     this.metadata,
                     clientId,
                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
@@ -305,7 +306,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * <p>
      * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
      * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
-     * get()} on this future will block until the associated request completes and then return the metadata for the record 
+     * get()} on this future will block until the associated request completes and then return the metadata for the record
      * or throw any exception that occurred while sending the record.
      * <p>
      * If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately:

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index aa26420..06f00a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -22,6 +22,7 @@ import java.util.Properties;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.SSLConfigs;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -225,9 +226,26 @@ public class ProducerConfig extends AbstractConfig {
                                         MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
                                 .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
                                 .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
+                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                                .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                                .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
+                                .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
+                                .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
+                                .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                                .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
+                                .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                                .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                                .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_LOCATION, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC)
+                                .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, SSLConfigs.DEFAULT_TRUSTSTORE_PASSWORD, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
+                                .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                                .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                                .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
                                 .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);
+
     }
 
     public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 6c31748..156ec14 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -107,6 +107,10 @@ public class AbstractConfig {
         return copy;
     }
 
+    public Map<String, ?> values() {
+        return new HashMap<String, Object>(values);
+    }
+
     private void logAll() {
         StringBuilder b = new StringBuilder();
         b.append(getClass().getSimpleName());
@@ -133,7 +137,7 @@ public class AbstractConfig {
     /**
      * Get a configured instance of the give class specified by the given configuration key. If the object implements
      * Configurable configure it using the configuration.
-     * 
+     *
      * @param key The configuration key for the class
      * @param t The interface the class should implement
      * @return A configured instance of the class

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
new file mode 100644
index 0000000..dd7b71a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.KeyManagerFactory;
+
+public class SSLConfigs {
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class";
+    public static final String PRINCIPAL_BUILDER_CLASS_DOC = "principal builder to generate a java Principal. This config is optional for client.";
+    public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder";
+
+    public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
+    public static final String SSL_PROTOCOL_DOC = "The ssl protocol used to generate SSLContext."
+            + "Default setting is TLS. Allowed values are SSL, SSLv2, SSLv3, TLS, TLSv1.1, TLSv1.2";
+    public static final String DEFAULT_SSL_PROTOCOL = "TLS";
+
+    public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
+    public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";
+
+    public static final String SSL_CIPHER_SUITES_CONFIG = "ssl.cipher.suites";
+    public static final String SSL_CIPHER_SUITES_DOC = "A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol."
+            + "By default all the available cipher suites are supported.";
+
+    public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
+    public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
+            + "All versions of TLS is enabled by default.";
+    public static final String DEFAULT_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
+
+    public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
+    public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
+            + "This is optional for client. Default value is JKS";
+    public static final String DEFAULT_SSL_KEYSTORE_TYPE = "JKS";
+
+    public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
+    public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. "
+        + "This is optional for Client and can be used for two-way authentication for client.";
+
+    public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password";
+    public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file."
+        + "This is optional for client and only needed if the ssl.keystore.location configured. ";
+
+    public static final String SSL_KEY_PASSWORD_CONFIG = "ssl.key.password";
+    public static final String SSL_KEY_PASSWORD_DOC = "The password of the private key in the key store file. "
+            + "This is optional for client.";
+
+    public static final String SSL_TRUSTSTORE_TYPE_CONFIG = "ssl.truststore.type";
+    public static final String SSL_TRUSTSTORE_TYPE_DOC = "The file format of the trust store file. "
+            + "Default value is JKS.";
+    public static final String DEFAULT_SSL_TRUSTSTORE_TYPE = "JKS";
+
+    public static final String SSL_TRUSTSTORE_LOCATION_CONFIG = "ssl.truststore.location";
+    public static final String SSL_TRUSTSTORE_LOCATION_DOC = "The location of the trust store file. ";
+    public static final String DEFAULT_TRUSTSTORE_LOCATION = "/tmp/ssl.truststore.jks";
+
+    public static final String SSL_TRUSTSTORE_PASSWORD_CONFIG = "ssl.truststore.password";
+    public static final String SSL_TRUSTSTORE_PASSWORD_DOC = "The password for the trust store file. ";
+    public static final String DEFAULT_TRUSTSTORE_PASSWORD = "truststore_password";
+
+    public static final String SSL_KEYMANAGER_ALGORITHM_CONFIG = "ssl.keymanager.algorithm";
+    public static final String SSL_KEYMANAGER_ALGORITHM_DOC = "The algorithm used by key manager factory for SSL connections. "
+            + "Default value is the key manager factory algorithm configured for the Java Virtual Machine.";
+    public static final String DEFAULT_SSL_KEYMANGER_ALGORITHM = KeyManagerFactory.getDefaultAlgorithm();
+
+    public static final String SSL_TRUSTMANAGER_ALGORITHM_CONFIG = "ssl.trustmanager.algorithm";
+    public static final String SSL_TRUSTMANAGER_ALGORITHM_DOC = "The algorithm used by trust manager factory for SSL connections. "
+            + "Default value is the trust manager factory algorithm configured for the Java Virtual Machine.";
+    public static final String DEFAULT_SSL_TRUSTMANAGER_ALGORITHM = TrustManagerFactory.getDefaultAlgorithm();
+
+    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "ssl.endpoint.identification.algorithm";
+    public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "The endpoint identification algorithm to validate server hostname using server certificate. ";
+
+    public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
+    public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication."
+                                           + " The following settings are common: "
+                                           + " <ul>"
+                                           + " <li><code>ssl.want.client.auth=required</code> If set to required"
+                                           + " client authentication is required."
+                                           + " <li><code>ssl.client.auth=requested</code> This means client authentication is optional."
+                                           + " unlike requested , if this option is set client can choose not to provide authentication information about itself"
+                                           + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed.";
+
+    public static final String SSL_NEED_CLIENT_AUTH_DOC = "It can be REQUESTED . "
+        + "Default value is false";
+    public static final Boolean DEFAULT_SSL_NEED_CLIENT_AUTH = false;
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
new file mode 100644
index 0000000..261f571
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+/**
+ * Authentication for Channel
+ */
+
+import java.io.IOException;
+import java.security.Principal;
+
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.KafkaException;
+
+public interface Authenticator {
+
+    /**
+     * configures Authenticator using principalbuilder and transportLayer.
+     * @param TransportLayer transportLayer
+     * @param PrincipalBuilder principalBuilder
+     */
+    void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder);
+
+    /**
+     * Implements any authentication mechanism. Use transportLayer to read or write tokens.
+     * If no further authentication needs to be done returns.
+     */
+    void authenticate() throws IOException;
+
+    /**
+     * Returns Principal using PrincipalBuilder
+     */
+    Principal principal() throws KafkaException;
+
+    /**
+     * returns true if authentication is complete otherwise returns false;
+     */
+    boolean complete();
+
+    /**
+     * Closes this Authenticator
+     *
+     * @throws IOException if any I/O error occurs
+     */
+    void close() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
index df0e6d5..d7357b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -26,6 +26,7 @@ public class ByteBufferSend implements Send {
     protected final ByteBuffer[] buffers;
     private int remaining;
     private int size;
+    private boolean pending = false;
 
     public ByteBufferSend(String destination, ByteBuffer... buffers) {
         super();
@@ -43,7 +44,7 @@ public class ByteBufferSend implements Send {
 
     @Override
     public boolean completed() {
-        return remaining <= 0;
+        return remaining <= 0 && !pending;
     }
 
     @Override
@@ -57,6 +58,12 @@ public class ByteBufferSend implements Send {
         if (written < 0)
             throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
         remaining -= written;
+        // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel.
+        // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than
+        // GatheringByteChannel or ScatteringByteChannel.
+        if (channel instanceof TransportLayer)
+            pending = ((TransportLayer) channel).hasPendingWrites();
+
         return written;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
new file mode 100644
index 0000000..52a7aab
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.util.Map;
+import java.nio.channels.SelectionKey;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * A ChannelBuilder interface to build Channel based on configs
+ */
+public interface ChannelBuilder {
+
+    /**
+     * Configure this class with the given key-value pairs
+     */
+    void configure(Map<String, ?> configs) throws KafkaException;
+
+
+    /**
+     * returns a Channel with TransportLayer and Authenticator configured.
+     * @param  id  channel id
+     * @param  key SelectionKey
+     */
+    KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException;
+
+
+    /**
+     * Closes ChannelBuilder
+     */
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
new file mode 100644
index 0000000..813a4aa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import java.security.Principal;
+import java.io.IOException;
+
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.KafkaException;
+
+public class DefaultAuthenticator implements Authenticator {
+
+    private TransportLayer transportLayer;
+    private PrincipalBuilder principalBuilder;
+    private Principal principal;
+
+    public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder) {
+        this.transportLayer = transportLayer;
+        this.principalBuilder = principalBuilder;
+    }
+
+    /**
+     * No-Op for default authenticator
+     */
+    public void authenticate() throws IOException {}
+
+    /**
+     * Constructs Principal using configured principalBuilder.
+     * @return Principal
+     * @throws KafkaException
+     */
+    public Principal principal() throws KafkaException {
+        if (principal == null)
+            principal = principalBuilder.buildPrincipal(transportLayer, this);
+        return principal;
+    }
+
+    public void close() throws IOException {}
+
+    /**
+     * DefaultAuthenticator doesn't implement any additional authentication mechanism.
+     * @returns true
+     */
+    public boolean complete() {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
new file mode 100644
index 0000000..28a4f41
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+
+import java.io.IOException;
+
+import java.net.Socket;
+import java.nio.channels.SelectionKey;
+
+import java.security.Principal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaChannel {
+    private static final Logger log = LoggerFactory.getLogger(KafkaChannel.class);
+    private final String id;
+    private TransportLayer transportLayer;
+    private Authenticator authenticator;
+    private NetworkReceive receive;
+    private Send send;
+    private int maxReceiveSize;
+
+    public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
+        this.id = id;
+        this.transportLayer = transportLayer;
+        this.authenticator = authenticator;
+        this.maxReceiveSize = maxReceiveSize;
+    }
+
+    public void close() throws IOException {
+        transportLayer.close();
+        authenticator.close();
+    }
+
+    /**
+     * returns user principal for the session
+     * In case of PLAINTEXT and No Authentication returns ANONYMOUS as the userPrincipal
+     * If SSL used without any SASL Authentication returns SSLSession.peerPrincipal
+     */
+    public Principal principal() throws IOException {
+        return authenticator.principal();
+    }
+
+    /**
+     * Does handshake of transportLayer and Authentication using configured authenticator
+     */
+    public void prepare() throws IOException {
+        if (transportLayer.ready() && authenticator.complete())
+            return;
+        if (!transportLayer.ready())
+            transportLayer.handshake();
+        if (transportLayer.ready() && !authenticator.complete())
+            authenticator.authenticate();
+    }
+
+    public void disconnect() {
+        transportLayer.disconnect();
+    }
+
+
+    public void finishConnect() throws IOException {
+        transportLayer.finishConnect();
+    }
+
+    public boolean isConnected() {
+        return transportLayer.isConnected();
+    }
+
+    public String id() {
+        return id;
+    }
+
+    public void mute() {
+        transportLayer.removeInterestOps(SelectionKey.OP_READ);
+    }
+
+    public void unmute() {
+        transportLayer.addInterestOps(SelectionKey.OP_READ);
+    }
+
+    public boolean isMute() {
+        return transportLayer.isMute();
+    }
+
+    public boolean ready() {
+        return transportLayer.ready() && authenticator.complete();
+    }
+
+    public boolean hasSend() {
+        return send != null;
+    }
+
+    public String socketDescription() {
+        Socket socket = transportLayer.socketChannel().socket();
+        if (socket == null)
+            return "[unconnected socket]";
+        else if (socket.getInetAddress() != null)
+            return socket.getInetAddress().toString();
+        else
+            return socket.getLocalAddress().toString();
+    }
+
+    public void setSend(Send send) {
+        if (this.send != null)
+            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
+        this.send = send;
+        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
+    }
+
+    public NetworkReceive read() throws IOException {
+        NetworkReceive result = null;
+
+        if (receive == null) {
+            receive = new NetworkReceive(maxReceiveSize, id);
+        }
+
+        long x = receive(receive);
+        if (receive.complete()) {
+            receive.payload().rewind();
+            result = receive;
+            receive = null;
+        }
+        return result;
+    }
+
+    public Send write() throws IOException {
+        Send result = null;
+        if (send != null && send(send)) {
+            result = send;
+            send = null;
+        }
+        return result;
+    }
+
+    private long receive(NetworkReceive receive) throws IOException {
+        long result = receive.readFrom(transportLayer);
+        return result;
+    }
+
+    private boolean send(Send send) throws IOException {
+        send.writeTo(transportLayer);
+        if (send.completed())
+            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+
+        return send.completed();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 3ca0098..2a1568e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -89,6 +89,7 @@ public class NetworkReceive implements Receive {
                     throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                 if (maxSize != UNLIMITED && receiveSize > maxSize)
                     throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
+
                 this.buffer = ByteBuffer.allocate(receiveSize);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
new file mode 100644
index 0000000..76dbf93
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.nio.channels.SelectionKey;
+import java.util.Map;
+
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.KafkaException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PlaintextChannelBuilder implements ChannelBuilder {
+    private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class);
+    private PrincipalBuilder principalBuilder;
+
+    public void configure(Map<String, ?> configs) throws KafkaException {
+        try {
+            this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+            this.principalBuilder.configure(configs);
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+        KafkaChannel channel = null;
+        try {
+            PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
+            Authenticator authenticator = new DefaultAuthenticator();
+            authenticator.configure(transportLayer, this.principalBuilder);
+            channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+        } catch (Exception e) {
+            log.warn("Failed to create channel due to ", e);
+            throw new KafkaException(e);
+        }
+        return channel;
+    }
+
+    public void close() {
+        this.principalBuilder.close();
+    }
+
+}


[2/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
new file mode 100644
index 0000000..f79b65c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import java.util.Map;
+import java.util.List;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+
+import javax.net.ssl.*;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.SSLConfigs;
+
+
+public class SSLFactory implements Configurable {
+
+    public enum Mode { CLIENT, SERVER };
+    private String protocol;
+    private String provider;
+    private String kmfAlgorithm;
+    private String tmfAlgorithm;
+    private SecurityStore keystore = null;
+    private String keyPassword;
+    private SecurityStore truststore;
+    private String[] cipherSuites;
+    private String[] enabledProtocols;
+    private String endpointIdentification;
+    private SSLContext sslContext;
+    private boolean needClientAuth;
+    private boolean wantClientAuth;
+    private final Mode mode;
+
+
+    public SSLFactory(Mode mode) {
+        this.mode = mode;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) throws KafkaException {
+        this.protocol =  (String) configs.get(SSLConfigs.SSL_PROTOCOL_CONFIG);
+        this.provider = (String) configs.get(SSLConfigs.SSL_PROVIDER_CONFIG);
+
+        if (configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG) != null) {
+            List<String> cipherSuitesList = (List<String>) configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG);
+            this.cipherSuites = (String[]) cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
+        }
+
+        if (configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG) != null) {
+            List<String> enabledProtocolsList = (List<String>) configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
+            this.enabledProtocols =  (String[]) enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
+        }
+
+        if (configs.containsKey(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)) {
+            this.endpointIdentification = (String) configs.get(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+        }
+
+        if (configs.containsKey(SSLConfigs.SSL_CLIENT_AUTH_CONFIG)) {
+            String clientAuthConfig = (String) configs.get(SSLConfigs.SSL_CLIENT_AUTH_CONFIG);
+            if (clientAuthConfig.equals("required"))
+                this.needClientAuth = true;
+            else if (clientAuthConfig.equals("requested"))
+                this.wantClientAuth = true;
+        }
+
+        this.kmfAlgorithm = (String) configs.get(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
+        this.tmfAlgorithm = (String) configs.get(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
+
+        if (checkKeyStoreConfigs(configs)) {
+            createKeystore((String) configs.get(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG),
+                           (String) configs.get(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
+                           (String) configs.get(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
+                           (String) configs.get(SSLConfigs.SSL_KEY_PASSWORD_CONFIG));
+        }
+
+        createTruststore((String) configs.get(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
+                         (String) configs.get(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
+                         (String) configs.get(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+        try {
+            this.sslContext = createSSLContext();
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+
+    private SSLContext createSSLContext() throws GeneralSecurityException, IOException  {
+        SSLContext sslContext;
+        if (provider != null)
+            sslContext = SSLContext.getInstance(protocol, provider);
+        else
+            sslContext = SSLContext.getInstance(protocol);
+
+        KeyManager[] keyManagers = null;
+        if (keystore != null) {
+            String kmfAlgorithm = this.kmfAlgorithm != null ? this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm();
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm);
+            KeyStore ks = keystore.load();
+            String keyPassword = this.keyPassword != null ? this.keyPassword : keystore.password;
+            kmf.init(ks, keyPassword.toCharArray());
+            keyManagers = kmf.getKeyManagers();
+        }
+
+        String tmfAlgorithm = this.tmfAlgorithm != null ? this.tmfAlgorithm : TrustManagerFactory.getDefaultAlgorithm();
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm);
+        KeyStore ts = truststore == null ? null : truststore.load();
+        tmf.init(ts);
+
+        sslContext.init(keyManagers, tmf.getTrustManagers(), null);
+        return sslContext;
+    }
+
+    public SSLEngine createSSLEngine(String peerHost, int peerPort) {
+        SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
+        if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
+        if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
+
+        if (mode == Mode.SERVER) {
+            sslEngine.setUseClientMode(false);
+            if (needClientAuth)
+                sslEngine.setNeedClientAuth(needClientAuth);
+            else
+                sslEngine.setWantClientAuth(wantClientAuth);
+        } else {
+            sslEngine.setUseClientMode(true);
+            SSLParameters sslParams = sslEngine.getSSLParameters();
+            sslParams.setEndpointIdentificationAlgorithm(endpointIdentification);
+            sslEngine.setSSLParameters(sslParams);
+        }
+        return sslEngine;
+    }
+
+    /**
+     * Returns a configured SSLContext.
+     * @return SSLContext.
+     */
+    public SSLContext sslContext() {
+        return sslContext;
+    }
+
+    private void createKeystore(String type, String path, String password, String keyPassword) {
+        if (path == null && password != null) {
+            throw new KafkaException("SSL key store password is not specified.");
+        } else if (path != null && password == null) {
+            throw new KafkaException("SSL key store is not specified, but key store password is specified.");
+        } else if (path != null && password != null) {
+            this.keystore = new SecurityStore(type, path, password);
+            this.keyPassword = keyPassword;
+        }
+    }
+
+    private void createTruststore(String type, String path, String password) {
+        if (path == null && password != null) {
+            throw new KafkaException("SSL key store password is not specified.");
+        } else if (path != null && password == null) {
+            throw new KafkaException("SSL key store is not specified, but key store password is specified.");
+        } else if (path != null && password != null) {
+            this.truststore = new SecurityStore(type, path, password);
+        }
+    }
+
+    private boolean checkKeyStoreConfigs(Map<String, ?> configs) {
+        return  configs.containsKey(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG) &&
+                configs.containsKey(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG) &&
+                configs.containsKey(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG) &&
+                configs.containsKey(SSLConfigs.SSL_KEY_PASSWORD_CONFIG);
+    }
+
+    private class SecurityStore {
+        private final String type;
+        private final String path;
+        private final String password;
+
+        private SecurityStore(String type, String path, String password) {
+            this.type = type == null ? KeyStore.getDefaultType() : type;
+            this.path = path;
+            this.password = password;
+        }
+
+        private KeyStore load() throws GeneralSecurityException, IOException {
+            FileInputStream in = null;
+            try {
+                KeyStore ks = KeyStore.getInstance(type);
+                in = new FileInputStream(path);
+                ks.load(in, password.toCharArray());
+                return ks;
+            } finally {
+                if (in != null) in.close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 80a914e..c58b741 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -48,7 +48,7 @@ public class Utils {
 
     /**
      * Turn the given UTF8 byte array into a string
-     * 
+     *
      * @param bytes The byte array
      * @return The string
      */
@@ -62,7 +62,7 @@ public class Utils {
 
     /**
      * Turn a string into a utf8 byte[]
-     * 
+     *
      * @param string The string
      * @return The byte[]
      */
@@ -76,7 +76,7 @@ public class Utils {
 
     /**
      * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes
-     * 
+     *
      * @param buffer The buffer to read from
      * @return The integer read, as a long to avoid signedness
      */
@@ -86,7 +86,7 @@ public class Utils {
 
     /**
      * Read an unsigned integer from the given position without modifying the buffers position
-     * 
+     *
      * @param buffer the buffer to read from
      * @param index the index from which to read the integer
      * @return The integer read, as a long to avoid signedness
@@ -97,12 +97,12 @@ public class Utils {
 
     /**
      * Read an unsigned integer stored in little-endian format from the {@link InputStream}.
-     * 
+     *
      * @param in The stream to read from
      * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
      */
     public static int readUnsignedIntLE(InputStream in) throws IOException {
-        return (in.read() << 8 * 0) 
+        return (in.read() << 8 * 0)
              | (in.read() << 8 * 1)
              | (in.read() << 8 * 2)
              | (in.read() << 8 * 3);
@@ -111,7 +111,7 @@ public class Utils {
     /**
      * Read an unsigned integer stored in little-endian format from a byte array
      * at a given offset.
-     * 
+     *
      * @param buffer The byte array to read from
      * @param offset The position in buffer to read from
      * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
@@ -125,7 +125,7 @@ public class Utils {
 
     /**
      * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
-     * 
+     *
      * @param buffer The buffer to write to
      * @param value The value to write
      */
@@ -135,7 +135,7 @@ public class Utils {
 
     /**
      * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
-     * 
+     *
      * @param buffer The buffer to write to
      * @param index The position in the buffer at which to begin writing
      * @param value The value to write
@@ -146,7 +146,7 @@ public class Utils {
 
     /**
      * Write an unsigned integer in little-endian format to the {@link OutputStream}.
-     * 
+     *
      * @param out The stream to write to
      * @param value The value to write
      */
@@ -160,7 +160,7 @@ public class Utils {
     /**
      * Write an unsigned integer in little-endian format to a byte array
      * at a given offset.
-     * 
+     *
      * @param buffer The byte array to write to
      * @param offset The position in buffer to write to
      * @param value The value to write
@@ -198,7 +198,7 @@ public class Utils {
 
     /**
      * Get the length for UTF8-encoding a string without encoding it first
-     * 
+     *
      * @param s The string to calculate the length for
      * @return The length when serialized
      */
@@ -244,7 +244,7 @@ public class Utils {
 
     /**
      * Check that the parameter t is not null
-     * 
+     *
      * @param t The object to check
      * @return t if it isn't null
      * @throws NullPointerException if t is null.
@@ -381,7 +381,7 @@ public class Utils {
     public static <T> String join(T[] strs, String seperator) {
         return join(Arrays.asList(strs), seperator);
     }
-    
+
     /**
      * Create a string representation of a list joined by the given separator
      * @param list The list of items
@@ -394,7 +394,7 @@ public class Utils {
         while (iter.hasNext()) {
             sb.append(iter.next());
             if (iter.hasNext())
-                sb.append(seperator);  
+                sb.append(seperator);
         }
         return sb.toString();
     }
@@ -488,7 +488,7 @@ public class Utils {
 
     /**
      * Attempt to read a file as a string
-     * @throws IOException 
+     * @throws IOException
      */
     public static String readFileAsString(String path, Charset charset) throws IOException {
         if (charset == null) charset = Charset.defaultCharset();
@@ -507,4 +507,21 @@ public class Utils {
     public static String readFileAsString(String path) throws IOException {
         return Utils.readFileAsString(path, Charset.defaultCharset());
     }
+
+    /**
+     * Check if the given ByteBuffer capacity
+     * @param existingBuffer ByteBuffer capacity to check
+     * @param newLength new length for the ByteBuffer.
+     * returns ByteBuffer
+     */
+    public static ByteBuffer ensureCapacity(ByteBuffer existingBuffer, int newLength) {
+        if (newLength > existingBuffer.capacity()) {
+            ByteBuffer newBuffer = ByteBuffer.allocate(newLength);
+            existingBuffer.flip();
+            newBuffer.put(existingBuffer);
+            return newBuffer;
+        }
+        return existingBuffer;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index 13ce519..d6a4019 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -39,4 +39,4 @@ public class ClientUtilsTest {
     private void check(String... url) {
         ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index f3f8334..d1759ce 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -18,12 +18,16 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockSerializer;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
 
 public class KafkaProducerTest {
 
@@ -50,17 +54,18 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void testSerializerClose() {
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
-        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
-
+    public void testSerializerClose() throws Exception {
+        Map<String, Object> configs = new HashMap<String, Object>();
+        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL);
+        configs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
         final int oldInitCount = MockSerializer.INIT_COUNT.get();
         final int oldCloseCount = MockSerializer.CLOSE_COUNT.get();
 
         KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
-                props, new MockSerializer(), new MockSerializer());
+                configs, new MockSerializer(), new MockSerializer());
         Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
         Assert.assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
new file mode 100644
index 0000000..f13c21a
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * A simple server that takes size delimited byte arrays and just echos them back to the sender.
+ */
+class EchoServer extends Thread {
+    public final int port;
+    private final ServerSocket serverSocket;
+    private final List<Thread> threads;
+    private final List<Socket> sockets;
+    private SecurityProtocol protocol = SecurityProtocol.PLAINTEXT;
+    private SSLFactory sslFactory;
+    private final AtomicBoolean renegotiate = new AtomicBoolean();
+
+    public EchoServer(Map<String, ?> configs) throws Exception {
+        this.protocol =  configs.containsKey("security.protocol") ?
+            SecurityProtocol.valueOf((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT;
+        if (protocol == SecurityProtocol.SSL) {
+            this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER);
+            this.sslFactory.configure(configs);
+            SSLContext sslContext = this.sslFactory.sslContext();
+            this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0);
+        } else {
+            this.serverSocket = new ServerSocket(0);
+        }
+        this.port = this.serverSocket.getLocalPort();
+        this.threads = Collections.synchronizedList(new ArrayList<Thread>());
+        this.sockets = Collections.synchronizedList(new ArrayList<Socket>());
+    }
+
+    public void renegotiate() {
+        renegotiate.set(true);
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (true) {
+                final Socket socket = serverSocket.accept();
+                sockets.add(socket);
+                Thread thread = new Thread() {
+                    @Override
+                    public void run() {
+                        try {
+                            DataInputStream input = new DataInputStream(socket.getInputStream());
+                            DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+                            while (socket.isConnected() && !socket.isClosed()) {
+                                int size = input.readInt();
+                                if (renegotiate.get()) {
+                                    renegotiate.set(false);
+                                    ((SSLSocket) socket).startHandshake();
+                                }
+                                byte[] bytes = new byte[size];
+                                input.readFully(bytes);
+                                output.writeInt(size);
+                                output.write(bytes);
+                                output.flush();
+                            }
+                        } catch (IOException e) {
+                            // ignore
+                        } finally {
+                            try {
+                                socket.close();
+                            } catch (IOException e) {
+                                // ignore
+                            }
+                        }
+                    }
+                };
+                thread.start();
+                threads.add(thread);
+            }
+        } catch (IOException e) {
+            // ignore
+        }
+    }
+
+    public void closeConnections() throws IOException {
+        for (Socket socket : sockets)
+            socket.close();
+    }
+
+    public void close() throws IOException, InterruptedException {
+        this.serverSocket.close();
+        closeConnections();
+        for (Thread t : threads)
+            t.join();
+        join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
new file mode 100644
index 0000000..df1205c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import java.io.IOException;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestSSLUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
+ */
+
+public class SSLSelectorTest {
+
+    private static final int BUFFER_SIZE = 4 * 1024;
+
+    private EchoServer server;
+    private Selector selector;
+    private ChannelBuilder channelBuilder;
+
+    @Before
+    public void setup() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+
+        Map<String, Object> sslServerConfigs = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.SERVER, trustStoreFile, "server");
+        sslServerConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+        this.server = new EchoServer(sslServerConfigs);
+        this.server.start();
+        Map<String, Object> sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, SSLFactory.Mode.SERVER, trustStoreFile, "client");
+        sslClientConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+
+        this.channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT);
+        this.channelBuilder.configure(sslClientConfigs);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        this.selector.close();
+        this.server.close();
+    }
+
+
+    /**
+     * Validate that we can send and receive a message larger than the receive and send buffer size
+     */
+    @Test
+    public void testSendLargeRequest() throws Exception {
+        String node = "0";
+        blockingConnect(node);
+        String big = TestUtils.randomString(10 * BUFFER_SIZE);
+        assertEquals(big, blockingRequest(node, big));
+    }
+
+
+    /**
+     * Validate that when the server disconnects, a client send ends up with that node in the disconnected list.
+     */
+    @Test
+    public void testServerDisconnect() throws Exception {
+        String node = "0";
+        // connect and do a simple request
+        blockingConnect(node);
+        assertEquals("hello", blockingRequest(node, "hello"));
+
+        // disconnect
+        this.server.closeConnections();
+        while (!selector.disconnected().contains(node))
+            selector.poll(1000L);
+
+        // reconnect and do another request
+        blockingConnect(node);
+        assertEquals("hello", blockingRequest(node, "hello"));
+    }
+
+
+    /**
+     * Validate that the client can intentionally disconnect and reconnect
+     */
+    @Test
+    public void testClientDisconnect() throws Exception {
+        String node = "0";
+        blockingConnect(node);
+        selector.disconnect(node);
+        selector.send(createSend(node, "hello1"));
+        selector.poll(10L);
+        assertEquals("Request should not have succeeded", 0, selector.completedSends().size());
+        assertEquals("There should be a disconnect", 1, selector.disconnected().size());
+        assertTrue("The disconnect should be from our node", selector.disconnected().contains(node));
+        blockingConnect(node);
+        assertEquals("hello2", blockingRequest(node, "hello2"));
+    }
+
+     /**
+     * Tests wrap BUFFER_OVERFLOW  and unwrap BUFFER_UNDERFLOW
+     * @throws Exception
+     */
+    @Test
+    public void testLargeMessageSequence() throws Exception {
+        int bufferSize = 512 * 1024;
+        String node = "0";
+        int reqs = 50;
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+        String requestPrefix = TestUtils.randomString(bufferSize);
+        sendAndReceive(node, requestPrefix, 0, reqs);
+    }
+
+
+    /**
+     * Test sending an empty string
+     */
+    @Test
+    public void testEmptyRequest() throws Exception {
+        String node = "0";
+        blockingConnect(node);
+        assertEquals("", blockingRequest(node, ""));
+    }
+
+
+    @Test
+    public void testMute() throws Exception {
+        blockingConnect("0");
+        blockingConnect("1");
+        // wait for handshake to finish
+        while (!selector.isChannelReady("0") && !selector.isChannelReady("1"))
+            selector.poll(5);
+        selector.send(createSend("0", "hello"));
+        selector.send(createSend("1", "hi"));
+        selector.mute("1");
+
+        while (selector.completedReceives().isEmpty())
+            selector.poll(5);
+        assertEquals("We should have only one response", 1, selector.completedReceives().size());
+        assertEquals("The response should not be from the muted node", "0", selector.completedReceives().get(0).source());
+        selector.unmute("1");
+        do {
+            selector.poll(5);
+        } while (selector.completedReceives().isEmpty());
+        assertEquals("We should have only one response", 1, selector.completedReceives().size());
+        assertEquals("The response should be from the previously muted node", "1", selector.completedReceives().get(0).source());
+    }
+
+
+    /**
+     * Tests that SSL renegotiation initiated by the server are handled correctly by the client
+     * @throws Exception
+     */
+    @Test
+    public void testRenegotiation() throws Exception {
+        int reqs = 500;
+        String node = "0";
+        // create connections
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        // send echo requests and receive responses
+        int requests = 0;
+        int responses = 0;
+        int renegotiates = 0;
+        while (!selector.isChannelReady(node)) {
+            selector.poll(1000L);
+        }
+        selector.send(createSend(node, node + "-" + 0));
+        requests++;
+
+        // loop until we complete all requests
+        while (responses < reqs) {
+            selector.poll(0L);
+            if (responses >= 100 && renegotiates == 0) {
+                renegotiates++;
+                server.renegotiate();
+            }
+            assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
+
+            // handle any responses we may have gotten
+            for (NetworkReceive receive : selector.completedReceives()) {
+                String[] pieces = asString(receive).split("-");
+                assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
+                assertEquals("Check the source", receive.source(), pieces[0]);
+                assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
+                assertEquals("Check the request counter", responses, Integer.parseInt(pieces[1]));
+                responses++;
+            }
+
+            // prepare new sends for the next round
+            for (int i = 0; i < selector.completedSends().size() && requests < reqs && selector.isChannelReady(node); i++, requests++) {
+                selector.send(createSend(node, node + "-" + requests));
+            }
+        }
+    }
+
+    private String blockingRequest(String node, String s) throws IOException {
+        selector.send(createSend(node, s));
+        while (true) {
+            selector.poll(1000L);
+            for (NetworkReceive receive : selector.completedReceives())
+                if (receive.source() == node)
+                    return asString(receive);
+        }
+    }
+
+    private String asString(NetworkReceive receive) {
+        return new String(Utils.toArray(receive.payload()));
+    }
+
+    private NetworkSend createSend(String node, String s) {
+        return new NetworkSend(node, ByteBuffer.wrap(s.getBytes()));
+    }
+
+    /* connect and wait for the connection to complete */
+    private void blockingConnect(String node) throws IOException {
+        selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        while (!selector.connected().contains(node))
+            selector.poll(10000L);
+        //finish the handshake as well
+        while (!selector.isChannelReady(node))
+            selector.poll(10000L);
+    }
+
+
+    private void sendAndReceive(String node, String requestPrefix, int startIndex, int endIndex) throws Exception {
+        int requests = startIndex;
+        int responses = startIndex;
+        // wait for handshake to finish
+        while (!selector.isChannelReady(node)) {
+            selector.poll(1000L);
+        }
+        selector.send(createSend(node, requestPrefix + "-" + startIndex));
+        requests++;
+        while (responses < endIndex) {
+            // do the i/o
+            selector.poll(0L);
+            assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
+
+            // handle requests and responses of the fast node
+            for (NetworkReceive receive : selector.completedReceives()) {
+                assertEquals(requestPrefix + "-" + responses, asString(receive));
+                responses++;
+            }
+
+            for (int i = 0; i < selector.completedSends().size() && requests < endIndex && selector.isChannelReady(node); i++, requests++) {
+                selector.send(createSend(node, requestPrefix + "-" + requests));
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 158f982..3a684d9 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -15,16 +15,16 @@ package org.apache.kafka.common.network;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.util.HashMap;
+import java.util.Map;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.config.SSLConfigs;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -43,13 +43,18 @@ public class SelectorTest {
     private EchoServer server;
     private Time time;
     private Selectable selector;
+    private ChannelBuilder channelBuilder;
 
     @Before
     public void setup() throws Exception {
-        this.server = new EchoServer();
+        Map<String, Object> configs = new HashMap<String, Object>();
+        configs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+        this.server = new EchoServer(configs);
         this.server.start();
         this.time = new MockTime();
-        this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", new LinkedHashMap<String, String>());
+        this.channelBuilder = new PlaintextChannelBuilder();
+        this.channelBuilder.configure(configs);
+        this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
     }
 
     @After
@@ -208,6 +213,19 @@ public class SelectorTest {
         assertEquals(big, blockingRequest(node, big));
     }
 
+    @Test
+    public void testLargeMessageSequence() throws Exception {
+        int bufferSize = 512 * 1024;
+        String node = "0";
+        int reqs = 50;
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+        String requestPrefix = TestUtils.randomString(bufferSize);
+        sendAndReceive(node, requestPrefix, 0, reqs);
+    }
+
+
+
     /**
      * Test sending an empty string
      */
@@ -285,71 +303,27 @@ public class SelectorTest {
         return new String(Utils.toArray(receive.payload()));
     }
 
-    /**
-     * A simple server that takes size delimited byte arrays and just echos them back to the sender.
-     */
-    static class EchoServer extends Thread {
-        public final int port;
-        private final ServerSocket serverSocket;
-        private final List<Thread> threads;
-        private final List<Socket> sockets;
-
-        public EchoServer() throws Exception {
-            this.serverSocket = new ServerSocket(0);
-            this.port = this.serverSocket.getLocalPort();
-            this.threads = Collections.synchronizedList(new ArrayList<Thread>());
-            this.sockets = Collections.synchronizedList(new ArrayList<Socket>());
-        }
+    private void sendAndReceive(String node, String requestPrefix, int startIndex, int endIndex) throws Exception {
+        int requests = startIndex;
+        int responses = startIndex;
+        selector.send(createSend(node, requestPrefix + "-" + startIndex));
+        requests++;
+        while (responses < endIndex) {
+            // do the i/o
+            selector.poll(0L);
+            assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
 
-        public void run() {
-            try {
-                while (true) {
-                    final Socket socket = serverSocket.accept();
-                    sockets.add(socket);
-                    Thread thread = new Thread() {
-                        public void run() {
-                            try {
-                                DataInputStream input = new DataInputStream(socket.getInputStream());
-                                DataOutputStream output = new DataOutputStream(socket.getOutputStream());
-                                while (socket.isConnected() && !socket.isClosed()) {
-                                    int size = input.readInt();
-                                    byte[] bytes = new byte[size];
-                                    input.readFully(bytes);
-                                    output.writeInt(size);
-                                    output.write(bytes);
-                                    output.flush();
-                                }
-                            } catch (IOException e) {
-                                // ignore
-                            } finally {
-                                try {
-                                    socket.close();
-                                } catch (IOException e) {
-                                    // ignore
-                                }
-                            }
-                        }
-                    };
-                    thread.start();
-                    threads.add(thread);
-                }
-            } catch (IOException e) {
-                // ignore
+            // handle requests and responses of the fast node
+            for (NetworkReceive receive : selector.completedReceives()) {
+                assertEquals(requestPrefix + "-" + responses, asString(receive));
+                responses++;
             }
-        }
-
-        public void closeConnections() throws IOException {
-            for (Socket socket : sockets)
-                socket.close();
-        }
 
-        public void close() throws IOException, InterruptedException {
-            this.serverSocket.close();
-            closeConnections();
-            for (Thread t : threads)
-                t.join();
-            join();
+            for (int i = 0; i < selector.completedSends().size() && requests < endIndex; i++, requests++) {
+                selector.send(createSend(node, requestPrefix + "-" + requests));
+            }
         }
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
new file mode 100644
index 0000000..0aec666
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import javax.net.ssl.*;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.kafka.test.TestSSLUtils;
+
+import org.junit.Test;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
+ */
+
+public class SSLFactoryTest {
+
+    @Test
+    public void testSSLFactoryConfiguration() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> serverSSLConfig = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.SERVER, trustStoreFile, "server");
+        SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.SERVER);
+        sslFactory.configure(serverSSLConfig);
+        //host and port are hints
+        SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);
+        assertNotNull(engine);
+        String[] expectedProtocols = {"TLSv1.2"};
+        assertArrayEquals(expectedProtocols, engine.getEnabledProtocols());
+        assertEquals(false, engine.getUseClientMode());
+    }
+
+    @Test
+    public void testClientMode() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> clientSSLConfig = TestSSLUtils.createSSLConfig(false, true, SSLFactory.Mode.CLIENT, trustStoreFile, "client");
+        SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT);
+        sslFactory.configure(clientSSLConfig);
+        //host and port are hints
+        SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);
+        assertTrue(engine.getUseClientMode());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index e7951d8..74ec52b 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -52,7 +52,7 @@ public class UtilsTest {
         assertEquals("[::1]:1234", formatAddress("::1", 1234));
         assertEquals("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678", formatAddress("2001:db8:85a3:8d3:1319:8a2e:370:7348", 5678));
     }
-    
+
     @Test
     public void testJoin() {
         assertEquals("", Utils.join(Collections.emptyList(), ","));
@@ -108,4 +108,4 @@ public class UtilsTest {
         assertEquals(1, Utils.min(2, 1, 3));
         assertEquals(1, Utils.min(2, 3, 1));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 51eb9d1..7257cad 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -120,4 +120,8 @@ public class MockSelector implements Selectable {
     public void unmuteAll() {
     }
 
+    @Override
+    public boolean isChannelReady(String id) {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
new file mode 100644
index 0000000..c01cf37
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.clients.CommonClientConfigs;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.EOFException;
+import java.math.BigInteger;
+import javax.net.ssl.TrustManagerFactory;
+import java.security.*;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.cert.X509v1CertificateBuilder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.crypto.params.AsymmetricKeyParameter;
+import org.bouncycastle.crypto.util.PrivateKeyFactory;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.DefaultDigestAlgorithmIdentifierFinder;
+import org.bouncycastle.operator.DefaultSignatureAlgorithmIdentifierFinder;
+import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+
+public class TestSSLUtils {
+
+    /**
+     * Create a self-signed X.509 Certificate.
+     * From http://bfo.com/blog/2011/03/08/odds_and_ends_creating_a_new_x_509_certificate.html.
+     *
+     * @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB"
+     * @param pair the KeyPair
+     * @param days how many days from now the Certificate is valid for
+     * @param algorithm the signing algorithm, eg "SHA1withRSA"
+     * @return the self-signed certificate
+     * @throws CertificateException thrown if a security error or an IO error ocurred.
+     */
+    public static X509Certificate generateCertificate(String dn, KeyPair pair,
+                                                      int days, String algorithm)
+        throws  CertificateException {
+
+        try {
+            Security.addProvider(new BouncyCastleProvider());
+            AlgorithmIdentifier sigAlgId = new DefaultSignatureAlgorithmIdentifierFinder().find(algorithm);
+            AlgorithmIdentifier digAlgId = new DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId);
+            AsymmetricKeyParameter privateKeyAsymKeyParam = PrivateKeyFactory.createKey(pair.getPrivate().getEncoded());
+            SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(pair.getPublic().getEncoded());
+            ContentSigner sigGen = new BcRSAContentSignerBuilder(sigAlgId, digAlgId).build(privateKeyAsymKeyParam);
+            X500Name name = new X500Name(dn);
+            Date from = new Date();
+            Date to = new Date(from.getTime() + days * 86400000L);
+            BigInteger sn = new BigInteger(64, new SecureRandom());
+
+            X509v1CertificateBuilder v1CertGen = new X509v1CertificateBuilder(name, sn, from, to, name, subPubKeyInfo);
+            X509CertificateHolder certificateHolder = v1CertGen.build(sigGen);
+            return new JcaX509CertificateConverter().setProvider("BC").getCertificate(certificateHolder);
+        } catch (CertificateException ce) {
+            throw ce;
+        } catch (Exception e) {
+            throw new CertificateException(e);
+        }
+    }
+
+    public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException {
+        KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm);
+        keyGen.initialize(1024);
+        return keyGen.genKeyPair();
+    }
+
+    private static KeyStore createEmptyKeyStore() throws GeneralSecurityException, IOException {
+        KeyStore ks = KeyStore.getInstance("JKS");
+        ks.load(null, null); // initialize
+        return ks;
+    }
+
+    private static void saveKeyStore(KeyStore ks, String filename,
+                                     String password) throws GeneralSecurityException, IOException {
+        FileOutputStream out = new FileOutputStream(filename);
+        try {
+            ks.store(out, password.toCharArray());
+        } finally {
+            out.close();
+        }
+    }
+
+    public static void createKeyStore(String filename,
+                                      String password, String alias,
+                                      Key privateKey, Certificate cert) throws GeneralSecurityException, IOException {
+        KeyStore ks = createEmptyKeyStore();
+        ks.setKeyEntry(alias, privateKey, password.toCharArray(),
+                new Certificate[]{cert});
+        saveKeyStore(ks, filename, password);
+    }
+
+    /**
+     * Creates a keystore with a single key and saves it to a file.
+     *
+     * @param filename String file to save
+     * @param password String store password to set on keystore
+     * @param keyPassword String key password to set on key
+     * @param alias String alias to use for the key
+     * @param privateKey Key to save in keystore
+     * @param cert Certificate to use as certificate chain associated to key
+     * @throws GeneralSecurityException for any error with the security APIs
+     * @throws IOException if there is an I/O error saving the file
+     */
+    public static void createKeyStore(String filename,
+                                      String password, String keyPassword, String alias,
+                                      Key privateKey, Certificate cert) throws GeneralSecurityException, IOException {
+        KeyStore ks = createEmptyKeyStore();
+        ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(),
+                new Certificate[]{cert});
+        saveKeyStore(ks, filename, password);
+    }
+
+    public static void createTrustStore(String filename,
+                                        String password, String alias,
+                                        Certificate cert) throws GeneralSecurityException, IOException {
+        KeyStore ks = createEmptyKeyStore();
+        ks.setCertificateEntry(alias, cert);
+        saveKeyStore(ks, filename, password);
+    }
+
+    public static <T extends Certificate> void createTrustStore(
+            String filename, String password, Map<String, T> certs) throws GeneralSecurityException, IOException {
+        KeyStore ks = KeyStore.getInstance("JKS");
+        try {
+            FileInputStream in = new FileInputStream(filename);
+            ks.load(in, password.toCharArray());
+            in.close();
+        } catch (EOFException e) {
+            ks = createEmptyKeyStore();
+        }
+        for (Map.Entry<String, T> cert : certs.entrySet()) {
+            ks.setCertificateEntry(cert.getKey(), cert.getValue());
+        }
+        saveKeyStore(ks, filename, password);
+    }
+
+    public static Map<String, X509Certificate> createX509Certificates(KeyPair keyPair)
+        throws GeneralSecurityException {
+        Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
+        X509Certificate cert = generateCertificate("CN=localhost, O=localhost", keyPair, 30, "SHA1withRSA");
+        certs.put("localhost", cert);
+        return certs;
+    }
+
+    public static Map<String, Object> createSSLConfig(SSLFactory.Mode mode, File keyStoreFile, String password, String keyPassword,
+                                                      File trustStoreFile, String trustStorePassword) {
+        Map<String, Object> sslConfigs = new HashMap<String, Object>();
+        sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security protocol
+        sslConfigs.put(SSLConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
+
+        if (mode == SSLFactory.Mode.SERVER || (mode == SSLFactory.Mode.CLIENT && keyStoreFile != null)) {
+            sslConfigs.put(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath());
+            sslConfigs.put(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
+            sslConfigs.put(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
+            sslConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
+            sslConfigs.put(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
+        }
+
+        sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath());
+        sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
+        sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
+        sslConfigs.put(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
+
+        List<String> enabledProtocols  = new ArrayList<String>();
+        enabledProtocols.add("TLSv1.2");
+        sslConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
+
+        return sslConfigs;
+    }
+
+    public static  Map<String, Object> createSSLConfig(boolean useClientCert, boolean trustStore, SSLFactory.Mode mode, File trustStoreFile, String certAlias)
+        throws IOException, GeneralSecurityException {
+        Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
+        File keyStoreFile;
+        String password;
+
+        if (mode == SSLFactory.Mode.SERVER)
+            password = "ServerPassword";
+        else
+            password = "ClientPassword";
+
+        String trustStorePassword = "TrustStorePassword";
+
+        if (useClientCert) {
+            keyStoreFile = File.createTempFile("clientKS", ".jks");
+            KeyPair cKP = generateKeyPair("RSA");
+            X509Certificate cCert = generateCertificate("CN=localhost, O=client", cKP, 30, "SHA1withRSA");
+            createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(), cCert);
+            certs.put(certAlias, cCert);
+        } else {
+            keyStoreFile = File.createTempFile("serverKS", ".jks");
+            KeyPair sKP = generateKeyPair("RSA");
+            X509Certificate sCert = generateCertificate("CN=localhost, O=server", sKP, 30,
+                                                        "SHA1withRSA");
+            createKeyStore(keyStoreFile.getPath(), password, password, "server", sKP.getPrivate(), sCert);
+            certs.put(certAlias, sCert);
+        }
+
+        if (trustStore) {
+            createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs);
+        }
+
+        Map<String, Object> sslConfig = createSSLConfig(mode, keyStoreFile, password,
+                                                        password, trustStoreFile, trustStorePassword);
+        return sslConfig;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 0b6b33a..b9efec2 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -24,6 +24,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.message.{MessageSet, ByteBufferMessageSet}
 import kafka.api.ApiUtils._
 import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.network.TransportLayer
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.network.MultiSend
 
@@ -56,7 +57,7 @@ class PartitionDataSend(val partitionId: Int,
                         val partitionData: FetchResponsePartitionData) extends Send {
   private val messageSize = partitionData.messages.sizeInBytes
   private var messagesSentSize = 0
-
+  private var pending = false
   private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + FetchResponsePartitionData.headerSize)
   buffer.putInt(partitionId)
   buffer.putShort(partitionData.error)
@@ -64,7 +65,7 @@ class PartitionDataSend(val partitionId: Int,
   buffer.putInt(partitionData.messages.sizeInBytes)
   buffer.rewind()
 
-  override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize
+  override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize && !pending
 
   override def destination: String = ""
 
@@ -77,6 +78,8 @@ class PartitionDataSend(val partitionId: Int,
       messagesSentSize += bytesSent
       written += bytesSent
     }
+    if (channel.isInstanceOf[TransportLayer])
+      pending = channel.asInstanceOf[TransportLayer].hasPendingWrites
     written
   }
 
@@ -111,7 +114,9 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
 
   private var sent = 0L
 
-  override def completed: Boolean = sent >= size
+  private var pending = false
+
+  override def completed: Boolean = sent >= size && !pending
 
   override def destination: String = dest
 
@@ -135,6 +140,10 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
     if(!buffer.hasRemaining && !sends.completed) {
       written += sends.writeTo(channel)
     }
+
+    if (channel.isInstanceOf[TransportLayer])
+      pending = channel.asInstanceOf[TransportLayer].hasPendingWrites
+
     sent += written
     written
   }
@@ -214,9 +223,11 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte
 
   private var sent = 0L
 
+  private var pending = false
+
   override def size = 4 /* for size byte */ + payloadSize
 
-  override def completed = sent >= size
+  override def completed = sent >= size && !pending
 
   override def destination = dest
 
@@ -242,7 +253,10 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) exte
       written += sends.writeTo(channel)
     }
     sent += written
+
+    if (channel.isInstanceOf[TransportLayer])
+      pending = channel.asInstanceOf[TransportLayer].hasPendingWrites
+
     written
   }
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index dbe784b..649812d 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -28,10 +28,13 @@ import com.yammer.metrics.core.Gauge
 import kafka.cluster.EndPoint
 import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.InvalidReceiveException
+import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.network.{InvalidReceiveException, ChannelBuilder,
+                                        PlaintextChannelBuilder, SSLChannelBuilder}
+import org.apache.kafka.common.security.ssl.SSLFactory
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
 import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
@@ -42,28 +45,33 @@ import scala.util.control.{NonFatal, ControlThrowable}
 /**
  * An NIO socket server. The threading model is
  *   1 Acceptor thread that handles new connections
- *   N Processor threads that each have their own selector and read requests from sockets
+ *   Acceptor has N Processor threads that each have their own selector and read requests from sockets
  *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
  */
-class SocketServer(val brokerId: Int,
-                   val endpoints: Map[SecurityProtocol, EndPoint],
-                   val numProcessorThreads: Int,
-                   val maxQueuedRequests: Int,
-                   val sendBufferSize: Int,
-                   val recvBufferSize: Int,
-                   val maxRequestSize: Int = Int.MaxValue,
-                   val maxConnectionsPerIp: Int = Int.MaxValue,
-                   val connectionsMaxIdleMs: Long,
-                   val maxConnectionsPerIpOverrides: Map[String, Int],
-                   val time: Time,
-                   val metrics: Metrics) extends Logging with KafkaMetricsGroup {
-  this.logIdent = "[Socket Server on Broker " + brokerId + "], "
-
-  private val processors = new Array[Processor](numProcessorThreads)
+class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
+
+  val channelConfigs = config.channelConfigs
+
+  val endpoints = config.listeners
+  val numProcessorThreads = config.numNetworkThreads
+  val maxQueuedRequests = config.queuedMaxRequests
+  val sendBufferSize = config.socketSendBufferBytes
+  val recvBufferSize = config.socketReceiveBufferBytes
+  val maxRequestSize = config.socketRequestMaxBytes
+  val maxConnectionsPerIp = config.maxConnectionsPerIp
+  val connectionsMaxIdleMs = config.connectionsMaxIdleMs
+  val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
+  val totalProcessorThreads = numProcessorThreads * endpoints.size
+
+  this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
+
+  val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
+  private val processors = new Array[Processor](totalProcessorThreads)
+
   private[network] var acceptors =  mutable.Map[EndPoint,Acceptor]()
-  val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
 
-  private val allMetricNames = (0 until numProcessorThreads).map { i =>
+
+  private val allMetricNames = (0 until totalProcessorThreads).map { i =>
     val tags = new util.HashMap[String, String]()
     tags.put("networkProcessor", i.toString)
     new MetricName("io-wait-ratio", "socket-server-metrics", tags)
@@ -83,49 +91,31 @@ class SocketServer(val brokerId: Int,
   def startup() {
     val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
 
-    newGauge("NetworkProcessorAvgIdlePercent",
-      new Gauge[Double] {
-        def value = allMetricNames.map( metricName =>
-          metrics.metrics().get(metricName).value()).sum / numProcessorThreads
-      }
-    )
-
-
-    this.synchronized {
-      for (i <- 0 until numProcessorThreads) {
-        processors(i) = new Processor(i,
-          time,
-          maxRequestSize,
-          numProcessorThreads,
-          requestChannel,
-          quotas,
-          connectionsMaxIdleMs,
-          portToProtocol,
-          metrics
-          )
-        Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start()
-      }
-    }
-
-    // register the processor threads for notification of responses
-    requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
-   
-    // start accepting connections
-    // right now we will use the same processors for all ports, since we didn't implement different protocols
-    // in the future, we may implement different processors for SSL and Kerberos
-
     this.synchronized {
+      var processorBeginIndex = 0
       endpoints.values.foreach(endpoint => {
-        val acceptor = new Acceptor(endpoint.host, endpoint.port, processors, sendBufferSize, recvBufferSize, quotas, endpoint.protocolType, portToProtocol)
+        val acceptor = new Acceptor(endpoint.host, endpoint.port, sendBufferSize, recvBufferSize, config.brokerId, requestChannel, processors, processorBeginIndex, numProcessorThreads, quotas,
+          endpoint.protocolType, portToProtocol, channelConfigs,  maxQueuedRequests, maxRequestSize, connectionsMaxIdleMs, metrics, allMetricNames, time)
         acceptors.put(endpoint, acceptor)
         Utils.newThread("kafka-socket-acceptor-%s-%d".format(endpoint.protocolType.toString, endpoint.port), acceptor, false).start()
         acceptor.awaitStartup
+        processorBeginIndex += numProcessorThreads
       })
     }
 
+    newGauge("NetworkProcessorAvgIdlePercent",
+      new Gauge[Double] {
+        def value = allMetricNames.map( metricName =>
+          metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
+      }
+    )
+
     info("Started " + acceptors.size + " acceptor threads")
   }
 
+  // register the processor threads for notification of responses
+  requestChannel.addResponseListener(id => processors(id).wakeup())
+
   /**
    * Shutdown the socket server
    */
@@ -145,8 +135,8 @@ class SocketServer(val brokerId: Int,
       case e: Exception => throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
     }
   }
-}
 
+}
 /**
  * A base class with some helper variables and methods
  */
@@ -188,7 +178,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
    * Is the server still running?
    */
   protected def isRunning = alive.get
-  
+
   /**
    * Close the given key and associated socket
    */
@@ -199,8 +189,9 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
       swallowError(key.cancel())
     }
   }
-  
+
   def close(channel: SocketChannel) {
+
     if(channel != null) {
       debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
       connectionQuotas.dec(channel.socket.getInetAddress)
@@ -213,25 +204,55 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
 /**
  * Thread that accepts and configures new connections. There is only need for one of these
  */
-private[kafka] class Acceptor(val host: String, 
+private[kafka] class Acceptor(val host: String,
                               private val port: Int,
-                              private val processors: Array[Processor],
-                              val sendBufferSize: Int, 
+                              val sendBufferSize: Int,
                               val recvBufferSize: Int,
+                              brokerId: Int,
+                              requestChannel: RequestChannel,
+                              processors: Array[Processor],
+                              processorBeginIndex: Int,
+                              numProcessorThreads: Int,
                               connectionQuotas: ConnectionQuotas,
                               protocol: SecurityProtocol,
-                              portToProtocol: ConcurrentHashMap[Int, SecurityProtocol]) extends AbstractServerThread(connectionQuotas) {
+                              portToProtocol: ConcurrentHashMap[Int, SecurityProtocol],
+                              channelConfigs: java.util.Map[String, Object],
+                              maxQueuedRequests: Int,
+                              maxRequestSize: Int,
+                              connectionsMaxIdleMs: Long,
+                              metrics: Metrics,
+                              allMetricNames: Seq[MetricName],
+                              time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
   val nioSelector = java.nio.channels.Selector.open()
   val serverChannel = openServerSocket(host, port)
+  val processorEndIndex = processorBeginIndex + numProcessorThreads
+
   portToProtocol.put(serverChannel.socket().getLocalPort, protocol)
 
+  this.synchronized {
+    for (i <- processorBeginIndex until processorEndIndex) {
+        processors(i) = new Processor(i,
+          time,
+          maxRequestSize,
+          numProcessorThreads,
+          requestChannel,
+          connectionQuotas,
+          connectionsMaxIdleMs,
+          protocol,
+          channelConfigs,
+          metrics
+          )
+        Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, protocol.name, i), processors(i), false).start()
+    }
+  }
+
   /**
    * Accept loop that checks for new connection attempts
    */
   def run() {
     serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT);
     startupComplete()
-    var currentProcessor = 0
+    var currentProcessor = processorBeginIndex
     while(isRunning) {
       val ready = nioSelector.select(500)
       if(ready > 0) {
@@ -248,7 +269,8 @@ private[kafka] class Acceptor(val host: String,
                throw new IllegalStateException("Unrecognized key state for acceptor thread.")
 
             // round robin to the next processor thread
-            currentProcessor = (currentProcessor + 1) % processors.length
+            currentProcessor = (currentProcessor + 1) % processorEndIndex
+            if (currentProcessor < processorBeginIndex) currentProcessor = processorBeginIndex
           } catch {
             case e: Throwable => error("Error while accepting connection", e)
           }
@@ -260,12 +282,12 @@ private[kafka] class Acceptor(val host: String,
     swallowError(nioSelector.close())
     shutdownComplete()
   }
-  
+
   /*
    * Create a server socket to listen for connections on.
    */
   def openServerSocket(host: String, port: Int): ServerSocketChannel = {
-    val socketAddress = 
+    val socketAddress =
       if(host == null || host.trim.isEmpty)
         new InetSocketAddress(port)
       else
@@ -277,7 +299,7 @@ private[kafka] class Acceptor(val host: String,
       serverChannel.socket.bind(socketAddress)
       info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort))
     } catch {
-      case e: SocketException => 
+      case e: SocketException =>
         throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e)
     }
     serverChannel
@@ -328,15 +350,17 @@ private[kafka] class Processor(val id: Int,
                                val requestChannel: RequestChannel,
                                connectionQuotas: ConnectionQuotas,
                                val connectionsMaxIdleMs: Long,
-                               val portToProtocol: ConcurrentHashMap[Int,SecurityProtocol],
+                               val protocol: SecurityProtocol,
+                               val channelConfigs: java.util.Map[String, Object],
                                val metrics: Metrics) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
   private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
-
+  private val channelBuilder = createChannelBuilder
   private val metricTags = new util.HashMap[String, String]()
   metricTags.put("networkProcessor", id.toString)
 
+
   newGauge("IdlePercent",
     new Gauge[Double] {
       def value = {
@@ -353,7 +377,8 @@ private[kafka] class Processor(val id: Int,
     time,
     "socket-server",
     metricTags,
-    false)
+    false,
+    channelBuilder)
 
   override def run() {
     startupComplete()
@@ -379,7 +404,7 @@ private[kafka] class Processor(val id: Int,
         }
         collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(receive => {
           try {
-            val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT)
+            val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
             requestChannel.sendRequest(req)
           } catch {
             case e @ (_: InvalidRequestException | _: SchemaException) => {
@@ -474,6 +499,14 @@ private[kafka] class Processor(val id: Int,
     }
   }
 
+  private def createChannelBuilder(): ChannelBuilder = {
+    val channelBuilder: ChannelBuilder = if (protocol == SecurityProtocol.SSL)  new SSLChannelBuilder(SSLFactory.Mode.SERVER)
+                                        else new PlaintextChannelBuilder()
+
+    channelBuilder.configure(channelConfigs)
+    channelBuilder
+  }
+
   /**
    * Close all open connections
    */
@@ -492,7 +525,7 @@ private[kafka] class Processor(val id: Int,
 class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
   private val overrides = overrideQuotas.map(entry => (InetAddress.getByName(entry._1), entry._2))
   private val counts = mutable.Map[InetAddress, Int]()
-  
+
   def inc(addr: InetAddress) {
     counts synchronized {
       val count = counts.getOrElse(addr, 0)
@@ -502,7 +535,7 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
         throw new TooManyConnectionsException(addr, max)
     }
   }
-  
+
   def dec(addr: InetAddress) {
     counts synchronized {
       val count = counts.get(addr).get
@@ -512,7 +545,7 @@ class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
         counts.put(addr, count - 1)
     }
   }
-  
+
 }
 
 class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException("Too many connections from %s (maximum = %d)".format(ip, count))


[3/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
new file mode 100644
index 0000000..a3567af
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+/*
+ * Transport layer for PLAINTEXT communication
+ */
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+
+import java.security.Principal;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PlaintextTransportLayer implements TransportLayer {
+    private static final Logger log = LoggerFactory.getLogger(PlaintextTransportLayer.class);
+    private final SelectionKey key;
+    private final SocketChannel socketChannel;
+    private final Principal principal = new KafkaPrincipal("ANONYMOUS");
+
+    public PlaintextTransportLayer(SelectionKey key) throws IOException {
+        this.key = key;
+        this.socketChannel = (SocketChannel) key.channel();
+    }
+
+    @Override
+    public boolean ready() {
+        return true;
+    }
+
+    @Override
+    public void finishConnect() throws IOException {
+        socketChannel.finishConnect();
+        int ops = key.interestOps();
+        ops &= ~SelectionKey.OP_CONNECT;
+        ops |= SelectionKey.OP_READ;
+        key.interestOps(ops);
+    }
+
+    @Override
+    public void disconnect() {
+        key.cancel();
+    }
+
+    @Override
+    public SocketChannel socketChannel() {
+        return socketChannel;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return socketChannel.isOpen();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return socketChannel.isConnected();
+    }
+
+    /**
+     * Closes this channel
+     *
+     * @throws IOException If I/O error occurs
+     */
+    @Override
+    public void close() throws IOException {
+        socketChannel.socket().close();
+        socketChannel.close();
+        key.attach(null);
+        key.cancel();
+    }
+
+    /**
+     * Performs SSL handshake hence is a no-op for the non-secure
+     * implementation
+     * @throws IOException
+    */
+    @Override
+    public void handshake() throws IOException {}
+
+    /**
+    * Reads a sequence of bytes from this channel into the given buffer.
+    *
+    * @param dst The buffer into which bytes are to be transferred
+    * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream
+    * @throws IOException if some other I/O error occurs
+    */
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+        return socketChannel.read(dst);
+    }
+
+    /**
+     * Reads a sequence of bytes from this channel into the given buffers.
+     *
+     * @param dsts - The buffers into which bytes are to be transferred.
+     * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+     * @throws IOException if some other I/O error occurs
+     */
+    @Override
+    public long read(ByteBuffer[] dsts) throws IOException {
+        return socketChannel.read(dsts);
+    }
+
+    /**
+     * Reads a sequence of bytes from this channel into a subsequence of the given buffers.
+     * @param dsts - The buffers into which bytes are to be transferred
+     * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
+     * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
+     * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+     * @throws IOException if some other I/O error occurs
+     */
+    @Override
+    public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+        return socketChannel.read(dsts, offset, length);
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the given buffer.
+    *
+    * @param src The buffer from which bytes are to be retrieved
+    * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        return socketChannel.write(src);
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the given buffer.
+    *
+    * @param srcs The buffer from which bytes are to be retrieved
+    * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public long write(ByteBuffer[] srcs) throws IOException {
+        return socketChannel.write(srcs);
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the subsequence of the given buffers.
+    *
+    * @param srcs The buffers from which bytes are to be retrieved
+    * @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length.
+    * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset.
+    * @return returns no.of bytes written , possibly zero.
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+        return socketChannel.write(srcs, offset, length);
+    }
+
+    /**
+     * always returns false as there will be not be any
+     * pending writes since we directly write to socketChannel.
+     */
+    @Override
+    public boolean hasPendingWrites() {
+        return false;
+    }
+
+    /**
+     * Returns ANONYMOUS as Principal.
+     */
+    @Override
+    public Principal peerPrincipal() throws IOException {
+        return principal;
+    }
+
+    /**
+     * Adds the interestOps to selectionKey.
+     * @param interestOps
+     */
+    @Override
+    public void addInterestOps(int ops) {
+        key.interestOps(key.interestOps() | ops);
+
+    }
+
+    /**
+     * Removes the interestOps from selectionKey.
+     * @param interestOps
+     */
+    @Override
+    public void removeInterestOps(int ops) {
+        key.interestOps(key.interestOps() & ~ops);
+    }
+
+    @Override
+    public boolean isMute() {
+        return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
new file mode 100644
index 0000000..88c218b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSLChannelBuilder implements ChannelBuilder {
+    private static final Logger log = LoggerFactory.getLogger(SSLChannelBuilder.class);
+    private SSLFactory sslFactory;
+    private PrincipalBuilder principalBuilder;
+    private SSLFactory.Mode mode;
+
+    public SSLChannelBuilder(SSLFactory.Mode mode) {
+        this.mode = mode;
+    }
+
+    public void configure(Map<String, ?> configs) throws KafkaException {
+        try {
+            this.sslFactory = new SSLFactory(mode);
+            this.sslFactory.configure(configs);
+            this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+            this.principalBuilder.configure(configs);
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+        KafkaChannel channel = null;
+        try {
+            SocketChannel socketChannel = (SocketChannel) key.channel();
+            SSLTransportLayer transportLayer = new SSLTransportLayer(id, key,
+                                                                     sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
+                                                                                                socketChannel.socket().getPort()));
+            Authenticator authenticator = new DefaultAuthenticator();
+            authenticator.configure(transportLayer, this.principalBuilder);
+            channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+        } catch (Exception e) {
+            log.info("Failed to create channel due to ", e);
+            throw new KafkaException(e);
+        }
+        return channel;
+    }
+
+    public void close()  {
+        this.principalBuilder.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
new file mode 100644
index 0000000..8ba1b01
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
@@ -0,0 +1,690 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import java.io.IOException;
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.CancelledKeyException;
+
+import java.security.Principal;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLPeerUnverifiedException;
+
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Transport layer for SSL communication
+ */
+
+public class SSLTransportLayer implements TransportLayer {
+    private static final Logger log = LoggerFactory.getLogger(SSLTransportLayer.class);
+    private final String channelId;
+    protected final SSLEngine sslEngine;
+    private final SelectionKey key;
+    private final SocketChannel socketChannel;
+    private HandshakeStatus handshakeStatus;
+    private SSLEngineResult handshakeResult;
+    private boolean handshakeComplete = false;
+    private boolean closing = false;
+    private ByteBuffer netReadBuffer;
+    private ByteBuffer netWriteBuffer;
+    private ByteBuffer appReadBuffer;
+    private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
+
+    public SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
+        this.channelId = channelId;
+        this.key = key;
+        this.socketChannel = (SocketChannel) key.channel();
+        this.sslEngine = sslEngine;
+        this.netReadBuffer = ByteBuffer.allocate(packetBufferSize());
+        this.netWriteBuffer = ByteBuffer.allocate(packetBufferSize());
+        this.appReadBuffer = ByteBuffer.allocate(applicationBufferSize());
+        startHandshake();
+    }
+
+    /**
+     * starts sslEngine handshake process
+     */
+    private void startHandshake() throws IOException {
+        //clear & set netRead & netWrite buffers
+        netWriteBuffer.position(0);
+        netWriteBuffer.limit(0);
+        netReadBuffer.position(0);
+        netReadBuffer.limit(0);
+        handshakeComplete = false;
+        closing = false;
+        //initiate handshake
+        sslEngine.beginHandshake();
+        handshakeStatus = sslEngine.getHandshakeStatus();
+    }
+
+    @Override
+    public boolean ready() {
+        return handshakeComplete;
+    }
+
+    /**
+     * does socketChannel.finishConnect()
+     */
+    @Override
+    public void finishConnect() throws IOException {
+        socketChannel.finishConnect();
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
+    }
+
+    /**
+     * disconnects selectionKey.
+     */
+    @Override
+    public void disconnect() {
+        key.cancel();
+    }
+
+    @Override
+    public SocketChannel socketChannel() {
+        return socketChannel;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return socketChannel.isOpen();
+    }
+
+    @Override
+    public boolean isConnected() {
+        return socketChannel.isConnected();
+    }
+
+
+    /**
+    * Sends a SSL close message and closes socketChannel.
+    * @throws IOException if an I/O error occurs
+    * @throws IOException if there is data on the outgoing network buffer and we are unable to flush it
+    */
+    @Override
+    public void close() throws IOException {
+        if (closing) return;
+        closing = true;
+        sslEngine.closeOutbound();
+        try {
+            if (!flush(netWriteBuffer)) {
+                throw new IOException("Remaining data in the network buffer, can't send SSL close message.");
+            }
+            //prep the buffer for the close message
+            netWriteBuffer.clear();
+            //perform the close, since we called sslEngine.closeOutbound
+            SSLEngineResult handshake = sslEngine.wrap(emptyBuf, netWriteBuffer);
+            //we should be in a close state
+            if (handshake.getStatus() != SSLEngineResult.Status.CLOSED) {
+                throw new IOException("Invalid close state, will not send network data.");
+            }
+            netWriteBuffer.flip();
+            flush(netWriteBuffer);
+            socketChannel.socket().close();
+            socketChannel.close();
+        } catch (IOException ie) {
+            log.warn("Failed to send SSL Close message ", ie);
+        }
+        key.attach(null);
+        key.cancel();
+    }
+
+    /**
+     * returns true if there are any pending contents in netWriteBuffer
+     */
+    @Override
+    public boolean hasPendingWrites() {
+        return netWriteBuffer.hasRemaining();
+    }
+
+    /**
+    * Flushes the buffer to the network, non blocking
+    * @param buf ByteBuffer
+    * @return boolean true if the buffer has been emptied out, false otherwise
+    * @throws IOException
+    */
+    private boolean flush(ByteBuffer buf) throws IOException {
+        int remaining = buf.remaining();
+        if (remaining > 0) {
+            int written = socketChannel.write(buf);
+            return written >= remaining;
+        }
+        return true;
+    }
+
+    /**
+    * Performs SSL handshake, non blocking.
+    * Before application data (kafka protocols) can be sent client & kafka broker must
+    * perform ssl handshake.
+    * During the handshake SSLEngine generates encrypted data  that will be transported over socketChannel.
+    * Each SSLEngine operation generates SSLEngineResult , of which SSLEngineResult.handshakeStatus field is used to
+    * determine what operation needs to occur to move handshake along.
+    * A typical handshake might look like this.
+    * +-------------+----------------------------------+-------------+
+    * |  client     |  SSL/TLS message                 | HSStatus    |
+    * +-------------+----------------------------------+-------------+
+    * | wrap()      | ClientHello                      | NEED_UNWRAP |
+    * | unwrap()    | ServerHello/Cert/ServerHelloDone | NEED_WRAP   |
+    * | wrap()      | ClientKeyExchange                | NEED_WRAP   |
+    * | wrap()      | ChangeCipherSpec                 | NEED_WRAP   |
+    * | wrap()      | Finished                         | NEED_UNWRAP |
+    * | unwrap()    | ChangeCipherSpec                 | NEED_UNWRAP |
+    * | unwrap()    | Finished                         | FINISHED    |
+    * +-------------+----------------------------------+-------------+
+    *
+    * @throws IOException
+    */
+    @Override
+    public void handshake() throws IOException {
+        boolean read = key.isReadable();
+        boolean write = key.isWritable();
+        handshakeComplete = false;
+        handshakeStatus = sslEngine.getHandshakeStatus();
+        if (!flush(netWriteBuffer)) {
+            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+            return;
+        }
+        try {
+            switch (handshakeStatus) {
+                case NEED_TASK:
+                    log.trace("SSLHandshake NEED_TASK channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    handshakeStatus = runDelegatedTasks();
+                    break;
+                case NEED_WRAP:
+                    log.trace("SSLHandshake NEED_WRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    handshakeResult = handshakeWrap(write);
+                    if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                        int currentPacketBufferSize = packetBufferSize();
+                        netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentPacketBufferSize);
+                        if (netWriteBuffer.position() >= currentPacketBufferSize) {
+                            throw new IllegalStateException("Buffer overflow when available data size (" + netWriteBuffer.position() +
+                                                            ") >= network buffer size (" + currentPacketBufferSize + ")");
+                        }
+                    } else if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                        throw new IllegalStateException("Should not have received BUFFER_UNDERFLOW during handshake WRAP.");
+                    } else if (handshakeResult.getStatus() == Status.CLOSED) {
+                        throw new EOFException();
+                    }
+                    log.trace("SSLHandshake NEED_WRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    //if handshake status is not NEED_UNWRAP or unable to flush netWriteBuffer contents
+                    //we will break here otherwise we can do need_unwrap in the same call.
+                    if (handshakeStatus != HandshakeStatus.NEED_UNWRAP ||  !flush(netWriteBuffer)) {
+                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                        break;
+                    }
+                case NEED_UNWRAP:
+                    log.trace("SSLHandshake NEED_UNWRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    handshakeResult = handshakeUnwrap(read);
+                    if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                        int currentPacketBufferSize = packetBufferSize();
+                        netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentPacketBufferSize);
+                        if (netReadBuffer.position() >= currentPacketBufferSize) {
+                            throw new IllegalStateException("Buffer underflow when there is available data");
+                        }
+                    } else if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                        int currentAppBufferSize = applicationBufferSize();
+                        appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentAppBufferSize);
+                        if (appReadBuffer.position() > currentAppBufferSize) {
+                            throw new IllegalStateException("Buffer underflow when available data size (" + appReadBuffer.position() +
+                                                            ") > packet buffer size (" + currentAppBufferSize + ")");
+                        }
+                    } else if (handshakeResult.getStatus() == Status.CLOSED) {
+                        throw new EOFException("SSL handshake status CLOSED during handshake UNWRAP");
+                    }
+                    log.trace("SSLHandshake NEED_UNWRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+
+                    //if handshakeStatus completed than fall-through to finished status.
+                    //after handshake is finished there is no data left to read/write in socketChannel.
+                    //so the selector won't invoke this channel if we don't go through the handshakeFinished here.
+                    if (handshakeStatus != HandshakeStatus.FINISHED) {
+                        if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
+                            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                        } else if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
+                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+                        }
+                        break;
+                    }
+                case FINISHED:
+                    handshakeFinished();
+                    break;
+                case NOT_HANDSHAKING:
+                    handshakeFinished();
+                    break;
+                default:
+                    throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus));
+            }
+        } catch (SSLException e) {
+            handshakeFailure();
+            throw e;
+        }
+    }
+
+
+    /**
+     * Executes the SSLEngine tasks needed.
+     * @return HandshakeStatus
+     */
+    private HandshakeStatus runDelegatedTasks() {
+        for (;;) {
+            Runnable task = delegatedTask();
+            if (task == null) {
+                break;
+            }
+            task.run();
+        }
+        return sslEngine.getHandshakeStatus();
+    }
+
+    /**
+     * Checks if the handshake status is finished
+     * Sets the interestOps for the selectionKey.
+     */
+    private void handshakeFinished() throws IOException {
+        // SSLEnginge.getHandshakeStatus is transient and it doesn't record FINISHED status properly.
+        // It can move from FINISHED status to NOT_HANDSHAKING after the handshake is completed.
+        // Hence we also need to check handshakeResult.getHandshakeStatus() if the handshake finished or not
+        if (handshakeResult.getHandshakeStatus() == HandshakeStatus.FINISHED) {
+            //we are complete if we have delivered the last package
+            handshakeComplete = !netWriteBuffer.hasRemaining();
+            //remove OP_WRITE if we are complete, otherwise we still have data to write
+            if (!handshakeComplete)
+                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+            else
+                key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+
+            log.trace("SSLHandshake FINISHED channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} ",
+                      channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+        } else {
+            throw new IOException("NOT_HANDSHAKING during handshake");
+        }
+    }
+
+    /**
+    * Performs the WRAP function
+    * @param doWrite boolean
+    * @return SSLEngineResult
+    * @throws IOException
+    */
+    private SSLEngineResult  handshakeWrap(Boolean doWrite) throws IOException {
+        log.trace("SSLHandshake handshakeWrap", channelId);
+        if (netWriteBuffer.hasRemaining())
+            throw new IllegalStateException("handshakeWrap called with netWriteBuffer not empty");
+        //this should never be called with a network buffer that contains data
+        //so we can clear it here.
+        netWriteBuffer.clear();
+        SSLEngineResult result = sslEngine.wrap(emptyBuf, netWriteBuffer);
+        //prepare the results to be written
+        netWriteBuffer.flip();
+        handshakeStatus = result.getHandshakeStatus();
+        if (result.getStatus() == SSLEngineResult.Status.OK &&
+            result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+            handshakeStatus = runDelegatedTasks();
+        }
+
+        if (doWrite) flush(netWriteBuffer);
+        return result;
+    }
+
+    /**
+    * Perform handshake unwrap
+    * @param doRead boolean
+    * @return SSLEngineResult
+    * @throws IOException
+    */
+    private SSLEngineResult handshakeUnwrap(Boolean doRead) throws IOException {
+        log.trace("SSLHandshake handshakeUnwrap", channelId);
+        SSLEngineResult result;
+        boolean cont = false;
+        int read = 0;
+        if (doRead)  {
+            read = socketChannel.read(netReadBuffer);
+            if (read == -1) throw new EOFException("EOF during handshake.");
+        }
+        do {
+            //prepare the buffer with the incoming data
+            netReadBuffer.flip();
+            result = sslEngine.unwrap(netReadBuffer, appReadBuffer);
+            netReadBuffer.compact();
+            handshakeStatus = result.getHandshakeStatus();
+            if (result.getStatus() == SSLEngineResult.Status.OK &&
+                result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+                handshakeStatus = runDelegatedTasks();
+            }
+            cont = result.getStatus() == SSLEngineResult.Status.OK &&
+                handshakeStatus == HandshakeStatus.NEED_UNWRAP;
+            log.trace("SSLHandshake handshakeUnwrap: handshakeStatus ", handshakeStatus);
+        } while (netReadBuffer.position() != 0 && cont);
+
+        return result;
+    }
+
+
+    /**
+    * Reads a sequence of bytes from this channel into the given buffer.
+    *
+    * @param dst The buffer into which bytes are to be transferred
+    * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream
+    * @throws IOException if some other I/O error occurs
+    */
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+        if (closing) return -1;
+        int read = 0;
+        if (!handshakeComplete) return read;
+
+        //if we have unread decrypted data in appReadBuffer read that into dst buffer.
+        if (appReadBuffer.position() > 0) {
+            read = readFromAppBuffer(dst);
+        }
+
+        if (dst.remaining() > 0) {
+            netReadBuffer = Utils.ensureCapacity(netReadBuffer, packetBufferSize());
+            if (netReadBuffer.remaining() > 0) {
+                int netread = socketChannel.read(netReadBuffer);
+                if (netread == 0) return netread;
+                else if (netread < 0) throw new EOFException("EOF during read");
+            }
+            do {
+                netReadBuffer.flip();
+                SSLEngineResult unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer);
+                netReadBuffer.compact();
+                // handle ssl renegotiation.
+                if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
+                    log.trace("SSLChannel Read begin renegotiation channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
+                              channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position());
+                    handshake();
+                    break;
+                }
+
+                if (unwrapResult.getStatus() == Status.OK) {
+                    read += readFromAppBuffer(dst);
+                } else if (unwrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                    int currentApplicationBufferSize = applicationBufferSize();
+                    appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentApplicationBufferSize);
+                    if (appReadBuffer.position() >= currentApplicationBufferSize) {
+                        throw new IllegalStateException("Buffer overflow when available data size (" + appReadBuffer.position() +
+                                                        ") >= application buffer size (" + currentApplicationBufferSize + ")");
+                    }
+
+                    // appReadBuffer will extended upto currentApplicationBufferSize
+                    // we need to read the existing content into dst before we can do unwrap again.  If there are no space in dst
+                    // we can break here.
+                    if (dst.hasRemaining())
+                        read += readFromAppBuffer(dst);
+                    else
+                        break;
+                } else if (unwrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                    int currentPacketBufferSize = packetBufferSize();
+                    netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentPacketBufferSize);
+                    if (netReadBuffer.position() >= currentPacketBufferSize) {
+                        throw new IllegalStateException("Buffer underflow when available data size (" + netReadBuffer.position() +
+                                                        ") > packet buffer size (" + currentPacketBufferSize + ")");
+                    }
+                    break;
+                } else if (unwrapResult.getStatus() == Status.CLOSED) {
+                    throw new EOFException();
+                }
+            } while (netReadBuffer.position() != 0);
+        }
+        return read;
+    }
+
+
+    /**
+     * Reads a sequence of bytes from this channel into the given buffers.
+     *
+     * @param dsts - The buffers into which bytes are to be transferred.
+     * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+     * @throws IOException if some other I/O error occurs
+     */
+    @Override
+    public long read(ByteBuffer[] dsts) throws IOException {
+        return read(dsts, 0, dsts.length);
+    }
+
+
+    /**
+     * Reads a sequence of bytes from this channel into a subsequence of the given buffers.
+     * @param dsts - The buffers into which bytes are to be transferred
+     * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length.
+     * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset
+     * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream.
+     * @throws IOException if some other I/O error occurs
+     */
+    @Override
+    public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
+        if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
+            throw new IndexOutOfBoundsException();
+
+        int totalRead = 0;
+        int i = offset;
+        while (i < length) {
+            if (dsts[i].hasRemaining()) {
+                int read = read(dsts[i]);
+                if (read > 0)
+                    totalRead += read;
+                else
+                    break;
+            }
+            if (!dsts[i].hasRemaining()) {
+                i++;
+            }
+        }
+        return totalRead;
+    }
+
+
+    /**
+    * Writes a sequence of bytes to this channel from the given buffer.
+    *
+    * @param src The buffer from which bytes are to be retrieved
+    * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        int written = 0;
+        if (closing) throw new IllegalStateException("Channel is in closing state");
+        if (!handshakeComplete) return written;
+
+        if (!flush(netWriteBuffer))
+            return written;
+
+        netWriteBuffer.clear();
+        SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
+        netWriteBuffer.flip();
+
+        //handle ssl renegotiation
+        if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
+            handshake();
+            return written;
+        }
+
+        if (wrapResult.getStatus() == Status.OK) {
+            written = wrapResult.bytesConsumed();
+            flush(netWriteBuffer);
+        } else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
+            int currentPacketBufferSize = packetBufferSize();
+            netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, packetBufferSize());
+            if (netWriteBuffer.position() >= currentPacketBufferSize)
+                throw new IllegalStateException("SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.position() + ") >= network buffer size (" + currentPacketBufferSize + ")");
+        } else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+            throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write");
+        } else if (wrapResult.getStatus() == Status.CLOSED) {
+            throw new EOFException();
+        }
+        return written;
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the subsequence of the given buffers.
+    *
+    * @param srcs The buffers from which bytes are to be retrieved
+    * @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length.
+    * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset.
+    * @return returns no.of bytes written , possibly zero.
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length)  throws IOException {
+        if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
+            throw new IndexOutOfBoundsException();
+        int totalWritten = 0;
+        int i = offset;
+        while (i < length) {
+            if (srcs[i].hasRemaining() || hasPendingWrites()) {
+                int written = write(srcs[i]);
+                if (written > 0) {
+                    totalWritten += written;
+                }
+            }
+            if (!srcs[i].hasRemaining() && !hasPendingWrites()) {
+                i++;
+            } else {
+                // if we are unable to write the current buffer to socketChannel we should break,
+                // as we might have reached max socket send buffer size.
+                break;
+            }
+        }
+        return totalWritten;
+    }
+
+    /**
+    * Writes a sequence of bytes to this channel from the given buffers.
+    *
+    * @param srcs The buffers from which bytes are to be retrieved
+    * @return returns no.of bytes consumed by SSLEngine.wrap , possibly zero.
+    * @throws IOException If some other I/O error occurs
+    */
+    @Override
+    public long write(ByteBuffer[] srcs) throws IOException {
+        return write(srcs, 0, srcs.length);
+    }
+
+
+    /**
+     * SSLSession's peerPrincipal for the remote host.
+     * @return Principal
+     */
+    public Principal peerPrincipal() throws IOException {
+        try {
+            return sslEngine.getSession().getPeerPrincipal();
+        } catch (SSLPeerUnverifiedException se) {
+            throw new IOException(String.format("Unable to retrieve getPeerPrincipal due to %s", se));
+        }
+    }
+
+    /**
+     * returns a SSL Session after the handshake is established
+     * throws IllegalStateException if the handshake is not established
+     */
+    public SSLSession sslSession() throws IllegalStateException {
+        return sslEngine.getSession();
+    }
+
+    /**
+     * Adds interestOps to SelectionKey of the TransportLayer
+     * @param ops SelectionKey interestOps
+     */
+    @Override
+    public void addInterestOps(int ops) {
+        if (!key.isValid())
+            throw new CancelledKeyException();
+        else if (!handshakeComplete)
+            throw new IllegalStateException("handshake is not completed");
+
+        key.interestOps(key.interestOps() | ops);
+    }
+
+    /**
+     * removes interestOps to SelectionKey of the TransportLayer
+     * @param ops SelectionKey interestOps
+     */
+    @Override
+    public void removeInterestOps(int ops) {
+        if (!key.isValid())
+            throw new CancelledKeyException();
+        else if (!handshakeComplete)
+            throw new IllegalStateException("handshake is not completed");
+
+        key.interestOps(key.interestOps() & ~ops);
+    }
+
+
+    /**
+     * returns delegatedTask for the SSLEngine.
+     */
+    protected Runnable delegatedTask() {
+        return sslEngine.getDelegatedTask();
+    }
+
+    /**
+     * transfers appReadBuffer contents (decrypted data) into dst bytebuffer
+     * @param dst ByteBuffer
+     */
+    private int readFromAppBuffer(ByteBuffer dst) {
+        appReadBuffer.flip();
+        int remaining = Math.min(appReadBuffer.remaining(), dst.remaining());
+        if (remaining > 0) {
+            int limit = appReadBuffer.limit();
+            appReadBuffer.limit(appReadBuffer.position() + remaining);
+            dst.put(appReadBuffer);
+            appReadBuffer.limit(limit);
+        }
+        appReadBuffer.compact();
+        return remaining;
+    }
+
+    private int packetBufferSize() {
+        return sslEngine.getSession().getPacketBufferSize();
+    }
+
+    private int applicationBufferSize() {
+        return sslEngine.getSession().getApplicationBufferSize();
+    }
+
+    private void handshakeFailure() {
+        //Release all resources such as internal buffers that SSLEngine is managing
+        sslEngine.closeOutbound();
+        try {
+            sslEngine.closeInbound();
+        } catch (SSLException e) {
+            log.debug("SSLEngine.closeInBound() raised an exception.",  e);
+        }
+    }
+
+    @Override
+    public boolean isMute() {
+        return  key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index 618a0fa..39eae4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -3,15 +3,16 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.network;
 
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
@@ -103,4 +104,9 @@ public interface Selectable {
      */
     public void unmuteAll();
 
-}
\ No newline at end of file
+    /**
+     * returns true  if a channel is ready
+     * @param id The id for the connection
+     */
+    public boolean isChannelReady(String id);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index ce20111..12c911c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -43,26 +43,26 @@ import org.slf4j.LoggerFactory;
  * responses.
  * <p>
  * A connection can be added to the nioSelector associated with an integer id by doing
- * 
+ *
  * <pre>
  * nioSelector.connect(&quot;42&quot;, new InetSocketAddress(&quot;google.com&quot;, server.port), 64000, 64000);
  * </pre>
- * 
+ *
  * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating
  * the connection. The successful invocation of this method does not mean a valid connection has been established.
- * 
+ *
  * Sending requests, receiving responses, processing connection completions, and disconnections on the existing
  * connections are all done using the <code>poll()</code> call.
- * 
+ *
  * <pre>
  * nioSelector.send(new NetworkSend(myDestination, myBytes));
  * nioSelector.send(new NetworkSend(myOtherDestination, myOtherBytes));
  * nioSelector.poll(TIMEOUT_MS);
  * </pre>
- * 
+ *
  * The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via
  * various getters. These are reset by each call to <code>poll()</code>.
- * 
+ *
  * This class is not thread safe!
  */
 public class Selector implements Selectable {
@@ -70,9 +70,10 @@ public class Selector implements Selectable {
     private static final Logger log = LoggerFactory.getLogger(Selector.class);
 
     private final java.nio.channels.Selector nioSelector;
-    private final Map<String, SelectionKey> keys;
+    private final Map<String, KafkaChannel> channels;
     private final List<Send> completedSends;
     private final List<NetworkReceive> completedReceives;
+    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
     private final List<String> disconnected;
     private final List<String> connected;
     private final List<String> failedSends;
@@ -80,6 +81,7 @@ public class Selector implements Selectable {
     private final SelectorMetrics sensors;
     private final String metricGrpPrefix;
     private final Map<String, String> metricTags;
+    private final ChannelBuilder channelBuilder;
     private final Map<String, Long> lruConnections;
     private final long connectionsMaxIdleNanos;
     private final int maxReceiveSize;
@@ -91,7 +93,7 @@ public class Selector implements Selectable {
     /**
      * Create a new nioSelector
      */
-    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
+    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
         try {
             this.nioSelector = java.nio.channels.Selector.open();
         } catch (IOException e) {
@@ -102,13 +104,15 @@ public class Selector implements Selectable {
         this.time = time;
         this.metricGrpPrefix = metricGrpPrefix;
         this.metricTags = metricTags;
-        this.keys = new HashMap<String, SelectionKey>();
+        this.channels = new HashMap<String, KafkaChannel>();
         this.completedSends = new ArrayList<Send>();
         this.completedReceives = new ArrayList<NetworkReceive>();
+        this.stagedReceives = new HashMap<KafkaChannel, Deque<NetworkReceive>>();
         this.connected = new ArrayList<String>();
         this.disconnected = new ArrayList<String>();
         this.failedSends = new ArrayList<String>();
         this.sensors = new SelectorMetrics(metrics);
+        this.channelBuilder = channelBuilder;
         // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
         this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true);
         currentTimeNanos = new SystemTime().nanoseconds();
@@ -116,8 +120,8 @@ public class Selector implements Selectable {
         this.metricsPerConnection = metricsPerConnection;
     }
 
-    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
-        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true);
+    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, ChannelBuilder channelBuilder) {
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true, channelBuilder);
     }
 
     /**
@@ -135,28 +139,29 @@ public class Selector implements Selectable {
      */
     @Override
     public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
-        if (this.keys.containsKey(id))
+        if (this.channels.containsKey(id))
             throw new IllegalStateException("There is already a connection for id " + id);
 
-        SocketChannel channel = SocketChannel.open();
-        channel.configureBlocking(false);
-        Socket socket = channel.socket();
+        SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.configureBlocking(false);
+        Socket socket = socketChannel.socket();
         socket.setKeepAlive(true);
         socket.setSendBufferSize(sendBufferSize);
         socket.setReceiveBufferSize(receiveBufferSize);
         socket.setTcpNoDelay(true);
         try {
-            channel.connect(address);
+            socketChannel.connect(address);
         } catch (UnresolvedAddressException e) {
-            channel.close();
+            socketChannel.close();
             throw new IOException("Can't resolve address: " + address, e);
         } catch (IOException e) {
-            channel.close();
+            socketChannel.close();
             throw e;
         }
-        SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT);
-        key.attach(new Transmissions(id));
-        this.keys.put(id, key);
+        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
+        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+        key.attach(channel);
+        this.channels.put(id, channel);
     }
 
     /**
@@ -164,10 +169,11 @@ public class Selector implements Selectable {
      * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector
      * Note that we are not checking if the connection id is valid - since the connection already exists
      */
-    public void register(String id, SocketChannel channel) throws ClosedChannelException {
-        SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ);
-        key.attach(new Transmissions(id));
-        this.keys.put(id, key);
+    public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
+        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
+        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+        key.attach(channel);
+        this.channels.put(id, channel);
     }
 
     /**
@@ -176,9 +182,9 @@ public class Selector implements Selectable {
      */
     @Override
     public void disconnect(String id) {
-        SelectionKey key = this.keys.get(id);
-        if (key != null)
-            key.cancel();
+        KafkaChannel channel = channelForId(id);
+        if (channel != null)
+            channel.disconnect();
     }
 
     /**
@@ -194,14 +200,15 @@ public class Selector implements Selectable {
      */
     @Override
     public void close() {
-        List<String> connections = new LinkedList<String>(keys.keySet());
+        List<String> connections = new LinkedList<String>(channels.keySet());
         for (String id: connections)
             close(id);
-
         try {
             this.nioSelector.close();
         } catch (IOException e) {
             log.error("Exception closing nioSelector:", e);
+        } catch (SecurityException se) {
+            log.error("Exception closing nioSelector:", se);
         }
     }
 
@@ -210,28 +217,38 @@ public class Selector implements Selectable {
      * @param send The request to send
      */
     public void send(Send send) {
-        SelectionKey key = keyForId(send.destination());
-        Transmissions transmissions = transmissions(key);
-        if (transmissions.hasSend())
-            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
-        transmissions.send = send;
+        KafkaChannel channel = channelForId(send.destination());
+        if (channel == null)
+            throw new IllegalStateException("channel is not connected");
+
         try {
-            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+            channel.setSend(send);
         } catch (CancelledKeyException e) {
-            close(transmissions.id);
             this.failedSends.add(send.destination());
+            close(channel);
         }
     }
 
     /**
      * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
      * disconnections, initiating new sends, or making progress on in-progress sends or receives.
-     * 
+     *
      * When this call is completed the user can check for completed sends, receives, connections or disconnects using
      * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
      * lists will be cleared at the beginning of each {@link #poll(long)} call and repopulated by the call if there is
      * any completed I/O.
-     * 
+     *
+     * In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting,
+     * we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses.
+     * This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrpyted
+     * we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's
+     * application buffer size. This means we might be reading additional bytes than the requested size.
+     * If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes
+     * in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are
+     * reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during
+     * the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0
+     * and pop response and add to the completedReceives.
+     *
      * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely.
      * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
      *         already an in-progress send
@@ -239,7 +256,8 @@ public class Selector implements Selectable {
     @Override
     public void poll(long timeout) throws IOException {
         clear();
-
+        if (hasStagedReceives())
+            timeout = 0;
         /* check ready keys */
         long startSelect = time.nanoseconds();
         int readyKeys = select(timeout);
@@ -253,85 +271,73 @@ public class Selector implements Selectable {
             while (iter.hasNext()) {
                 SelectionKey key = iter.next();
                 iter.remove();
-
-                Transmissions transmissions = transmissions(key);
-                SocketChannel channel = channel(key);
+                KafkaChannel channel = channel(key);
 
                 // register all per-connection metrics at once
-                sensors.maybeRegisterConnectionMetrics(transmissions.id);
-                lruConnections.put(transmissions.id, currentTimeNanos);
+                sensors.maybeRegisterConnectionMetrics(channel.id());
+                lruConnections.put(channel.id(), currentTimeNanos);
 
                 try {
                     /* complete any connections that have finished their handshake */
                     if (key.isConnectable()) {
                         channel.finishConnect();
-                        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
-                        this.connected.add(transmissions.id);
+                        this.connected.add(channel.id());
                         this.sensors.connectionCreated.record();
-                        log.debug("Connection {} created", transmissions.id);
                     }
 
-                    /* read from any connections that have readable data */
-                    if (key.isReadable()) {
-                        if (!transmissions.hasReceive())
-                            transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id);
+                    /* if channel is not ready finish prepare */
+                    if (channel.isConnected() && !channel.ready())
+                        channel.prepare();
+
+                    /* if channel is ready read from any connections that have readable data */
+                    if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
+                        NetworkReceive networkReceive;
                         try {
-                            transmissions.receive.readFrom(channel);
+                            while ((networkReceive = channel.read()) != null) {
+                                addToStagedReceives(channel, networkReceive);
+                            }
                         } catch (InvalidReceiveException e) {
-                            log.error("Invalid data received from " + transmissions.id + " closing connection", e);
-                            close(transmissions.id);
-                            this.disconnected.add(transmissions.id);
+                            log.error("Invalid data received from " + channel.id() + " closing connection", e);
+                            close(channel);
+                            this.disconnected.add(channel.id());
                             throw e;
                         }
-                        if (transmissions.receive.complete()) {
-                            transmissions.receive.payload().rewind();
-                            this.completedReceives.add(transmissions.receive);
-                            this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit());
-                            transmissions.clearReceive();
-                        }
                     }
 
-                    /* write to any sockets that have space in their buffer and for which we have data */
-                    if (key.isWritable()) {
-                        transmissions.send.writeTo(channel);
-                        if (transmissions.send.completed()) {
-                            this.completedSends.add(transmissions.send);
-                            this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
-                            transmissions.clearSend();
-                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
+                    /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
+                    if (channel.ready() && key.isWritable()) {
+                        Send send = channel.write();
+                        if (send != null) {
+                            this.completedSends.add(send);
+                            this.sensors.recordBytesSent(channel.id(), send.size());
                         }
                     }
 
                     /* cancel any defunct sockets */
                     if (!key.isValid()) {
-                        close(transmissions.id);
-                        this.disconnected.add(transmissions.id);
+                        close(channel);
+                        this.disconnected.add(channel.id());
                     }
                 } catch (IOException e) {
-                    String desc = socketDescription(channel);
+                    String desc = channel.socketDescription();
                     if (e instanceof EOFException || e instanceof ConnectException)
                         log.debug("Connection {} disconnected", desc);
                     else
                         log.warn("Error in I/O with connection to {}", desc, e);
-                    close(transmissions.id);
-                    this.disconnected.add(transmissions.id);
+                    close(channel);
+                    this.disconnected.add(channel.id());
                 }
             }
         }
+
+        addToCompletedReceives();
+
         long endIo = time.nanoseconds();
         this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
         maybeCloseOldestConnection();
     }
 
-    private String socketDescription(SocketChannel channel) {
-        Socket socket = channel.socket();
-        if (socket == null)
-            return "[unconnected socket]";
-        else if (socket.getInetAddress() != null)
-            return socket.getInetAddress().toString();
-        else
-            return socket.getLocalAddress().toString();
-    }
+
 
     @Override
     public List<Send> completedSends() {
@@ -355,32 +361,34 @@ public class Selector implements Selectable {
 
     @Override
     public void mute(String id) {
-        mute(this.keyForId(id));
+        KafkaChannel channel = channelForId(id);
+        mute(channel);
     }
 
-    private void mute(SelectionKey key) {
-        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
+    private void mute(KafkaChannel channel) {
+        channel.mute();
     }
 
     @Override
     public void unmute(String id) {
-        unmute(this.keyForId(id));
+        KafkaChannel channel = channelForId(id);
+        unmute(channel);
     }
 
-    private void unmute(SelectionKey key) {
-        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+    private void unmute(KafkaChannel channel) {
+        channel.unmute();
     }
 
     @Override
     public void muteAll() {
-        for (SelectionKey key : this.keys.values())
-            mute(key);
+        for (KafkaChannel channel : this.channels.values())
+            mute(channel);
     }
 
     @Override
     public void unmuteAll() {
-        for (SelectionKey key : this.keys.values())
-            unmute(key);
+        for (KafkaChannel channel : this.channels.values())
+            unmute(channel);
     }
 
     private void maybeCloseOldestConnection() {
@@ -418,7 +426,7 @@ public class Selector implements Selectable {
 
     /**
      * Check for data, waiting up to the given timeout.
-     * 
+     *
      * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely.
      * @return The number of keys ready
      * @throws IOException
@@ -434,81 +442,107 @@ public class Selector implements Selectable {
 
     /**
      * Begin closing this connection
+     * @param id channel id
      */
     public void close(String id) {
-        SelectionKey key = keyForId(id);
-        lruConnections.remove(id);
-        SocketChannel channel = channel(key);
-        Transmissions trans = transmissions(key);
-        if (trans != null) {
-            this.keys.remove(trans.id);
-            trans.clearReceive();
-            trans.clearSend();
-        }
-        key.attach(null);
-        key.cancel();
+        KafkaChannel channel = this.channels.get(id);
+        if (channel != null)
+            close(channel);
+    }
+
+    /**
+     * Begin closing this connection
+     */
+    private void close(KafkaChannel channel) {
         try {
-            channel.socket().close();
             channel.close();
         } catch (IOException e) {
-            log.error("Exception closing connection to node {}:", trans.id, e);
+            log.error("Exception closing connection to node {}:", channel.id(), e);
         }
+        this.stagedReceives.remove(channel);
+        this.channels.remove(channel.id());
+        this.lruConnections.remove(channel.id());
         this.sensors.connectionClosed.record();
     }
 
     /**
-     * Get the selection key associated with this numeric id
+     * check if channel is ready
      */
-    private SelectionKey keyForId(String id) {
-        SelectionKey key = this.keys.get(id);
-        if (key == null)
-            throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString());
-        return key;
+    @Override
+    public boolean isChannelReady(String id) {
+        KafkaChannel channel = channelForId(id);
+        return channel.ready();
     }
 
     /**
-     * Get the transmissions for the given connection
+     * Get the channel associated with this numeric id
      */
-    private Transmissions transmissions(SelectionKey key) {
-        return (Transmissions) key.attachment();
+    private KafkaChannel channelForId(String id) {
+        KafkaChannel channel = this.channels.get(id);
+        if (channel == null)
+            throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet().toString());
+        return channel;
     }
 
     /**
-     * Get the socket channel associated with this selection key
+     * Get the channel associated with selectionKey
      */
-    private SocketChannel channel(SelectionKey key) {
-        return (SocketChannel) key.channel();
+    private KafkaChannel channel(SelectionKey key) {
+        return (KafkaChannel) key.attachment();
     }
 
     /**
-     * The id and in-progress send and receive associated with a connection
+     * Check if given channel has a staged receive
      */
-    private static class Transmissions {
-        public String id;
-        public Send send;
-        public NetworkReceive receive;
+    private boolean hasStagedReceive(KafkaChannel channel) {
+        return stagedReceives.containsKey(channel);
+    }
 
-        public Transmissions(String id) {
-            this.id = id;
+    /**
+     * check if stagedReceives have unmuted channel
+     */
+    private boolean hasStagedReceives() {
+        for (KafkaChannel channel : this.stagedReceives.keySet()) {
+            if (!channel.isMute())
+                return true;
         }
+        return false;
+    }
 
-        public boolean hasSend() {
-            return this.send != null;
-        }
 
-        public void clearSend() {
-            this.send = null;
-        }
+    /**
+     * adds a receive to staged receieves
+     */
+    private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
+        if (!stagedReceives.containsKey(channel))
+            stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
 
-        public boolean hasReceive() {
-            return this.receive != null;
-        }
+        Deque<NetworkReceive> deque = stagedReceives.get(channel);
+        deque.add(receive);
+    }
 
-        public void clearReceive() {
-            this.receive = null;
+    /**
+     * checks if there are any staged receives and adds to completedReceives
+     */
+    private void addToCompletedReceives() {
+        if (this.stagedReceives.size() > 0) {
+            Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
+            while (iter.hasNext()) {
+                Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
+                KafkaChannel channel = entry.getKey();
+                if (!channel.isMute()) {
+                    Deque<NetworkReceive> deque = entry.getValue();
+                    NetworkReceive networkReceive = deque.poll();
+                    this.completedReceives.add(networkReceive);
+                    this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
+                    if (deque.size() == 0)
+                        iter.remove();
+                }
+            }
         }
     }
 
+
     private class SelectorMetrics {
         private final Metrics metrics;
         public final Sensor connectionClosed;
@@ -575,7 +609,7 @@ public class Selector implements Selectable {
             metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
             this.metrics.addMetric(metricName, new Measurable() {
                 public double measure(MetricConfig config, long now) {
-                    return keys.size();
+                    return channels.size();
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java
index 8f6daad..e0d8831 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Send.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -33,7 +33,7 @@ public interface Send {
     /**
      * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
      * to be completely written
-     * @param channel The channel to write to
+     * @param channel The Channel to write to
      * @return The number of bytes written
      * @throws IOException If the write fails
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
new file mode 100644
index 0000000..e9158aa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+/*
+ * Transport layer for underlying communication.
+ * At very basic level it is wrapper around SocketChannel and can be used as substitue for SocketChannel
+ * and other network Channel implementations.
+ * As NetworkClient replaces BlockingChannel and other implementations we will be using KafkaChannel as
+ * a network I/O channel.
+ */
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.channels.GatheringByteChannel;
+
+import java.security.Principal;
+
+
+public interface TransportLayer extends ScatteringByteChannel, GatheringByteChannel {
+
+    /**
+     * Returns true if the channel has handshake and authentication done.
+     */
+    boolean ready();
+
+    /**
+     * Finishes the process of connecting a socket channel.
+     */
+    void finishConnect() throws IOException;
+
+    /**
+     * disconnect socketChannel
+     */
+    void disconnect();
+
+    /**
+     * Tells whether or not this channel's network socket is connected.
+     */
+    boolean isConnected();
+
+    /**
+     * returns underlying socketChannel
+     */
+    SocketChannel socketChannel();
+
+
+    /**
+     * Performs SSL handshake hence is a no-op for the non-secure
+     * implementation
+     * @throws IOException
+    */
+    void handshake() throws IOException;
+
+    /**
+     * Returns true if there are any pending writes
+     */
+    boolean hasPendingWrites();
+
+    /**
+     * returns SSLSession.getPeerPrinicpal if SSLTransportLayer used
+     * for non-secure returns a "ANONYMOUS" as the peerPrincipal
+     */
+    Principal peerPrincipal() throws IOException;
+
+    void addInterestOps(int ops);
+
+    void removeInterestOps(int ops);
+
+    boolean isMute();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
index dab1a94..a624741 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
@@ -24,6 +24,8 @@ import java.util.Map;
 public enum SecurityProtocol {
     /** Un-authenticated, non-encrypted channel */
     PLAINTEXT(0, "PLAINTEXT"),
+    /** SSL channel */
+    SSL(1, "SSL"),
     /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
     TRACE(Short.MAX_VALUE, "TRACE");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
new file mode 100644
index 0000000..fbbeb9e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+import java.util.Map;
+import java.security.Principal;
+
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.KafkaException;
+
+/** DefaultPrincipalBuilder which return transportLayer's peer Principal **/
+
+public class DefaultPrincipalBuilder implements PrincipalBuilder {
+
+    public void configure(Map<String, ?> configs) {}
+
+    public Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException {
+        try {
+            return transportLayer.peerPrincipal();
+        } catch (Exception e) {
+            throw new KafkaException("Failed to build principal due to: ", e);
+        }
+    }
+
+    public void close() throws KafkaException {}
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
new file mode 100644
index 0000000..277b6ef
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+import java.security.Principal;
+
+public class KafkaPrincipal implements Principal {
+    private final String name;
+
+    public KafkaPrincipal(String name) {
+        if (name == null)
+            throw new IllegalArgumentException("name is null");
+        this.name = name;
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (this == object)
+            return true;
+
+        if (object instanceof KafkaPrincipal) {
+            return name.equals(((KafkaPrincipal) object).getName());
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return name.hashCode();
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
new file mode 100644
index 0000000..b7cc378
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+/*
+ * PrincipalBuilder for Authenticator
+ */
+
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Configurable;
+
+import java.util.Map;
+import java.security.Principal;
+
+public interface PrincipalBuilder extends Configurable {
+
+    /**
+     * configure this class with give key-value pair
+     */
+    public void configure(Map<String, ?> configs);
+
+    /**
+     * Returns Principal
+     * @param TransportLayer
+     * @param Authenticator
+     */
+    Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException;
+
+    /**
+     * Close this PrincipalBuilder
+     */
+    public void close() throws KafkaException;
+
+}