You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/20 23:13:44 UTC

[4/4] kafka git commit: KAFKA-1686; Implement SASL/Kerberos

KAFKA-1686; Implement SASL/Kerberos

This PR implements SASL/Kerberos which was originally submitted by harshach as https://github.com/apache/kafka/pull/191.

I've been submitting PRs to Harsha's branch with fixes and improvements and he has integrated all, but the most recent one. I'm creating this PR so that the Jenkins can run the tests on the branch (they pass locally).

Author: Ismael Juma <is...@juma.me.uk>
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Author: Harsha <ha...@users.noreply.github.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Rajini Sivaram <ra...@googlemail.com>, Parth Brahmbhatt <br...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #334 from ijuma/KAFKA-1686-V1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/403158b5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/403158b5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/403158b5

Branch: refs/heads/trunk
Commit: 403158b54b18cabf93eb15d4c4dd8ab66604bf9f
Parents: 8f32617
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Tue Oct 20 14:13:34 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Oct 20 14:13:34 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   5 +-
 checkstyle/import-control.xml                   |   6 +-
 .../org/apache/kafka/clients/ClientUtils.java   |   9 +-
 .../kafka/clients/consumer/ConsumerConfig.java  |   8 +
 .../kafka/clients/producer/ProducerConfig.java  |   8 +
 .../apache/kafka/common/config/SaslConfigs.java |  54 ++
 .../kafka/common/network/Authenticator.java     |  11 +-
 .../kafka/common/network/ChannelBuilder.java    |   2 +
 .../kafka/common/network/ChannelBuilders.java   |  25 +-
 .../common/network/DefaultAuthenticator.java    |   5 +-
 .../kafka/common/network/KafkaChannel.java      |   4 +-
 .../apache/kafka/common/network/LoginType.java  |  39 ++
 .../org/apache/kafka/common/network/Mode.java   |  19 +
 .../common/network/PlaintextChannelBuilder.java |   6 +-
 .../common/network/PlaintextTransportLayer.java |   5 +-
 .../kafka/common/network/SSLChannelBuilder.java |  20 +-
 .../kafka/common/network/SSLTransportLayer.java |   9 +-
 .../common/network/SaslChannelBuilder.java      | 113 ++++
 .../apache/kafka/common/network/Selector.java   |   1 +
 .../kafka/common/network/TransportLayer.java    |   3 +-
 .../kafka/common/protocol/SecurityProtocol.java |   4 +
 .../apache/kafka/common/security/JaasUtils.java |  60 +-
 .../common/security/auth/PrincipalBuilder.java  |  19 +-
 .../authenticator/SaslClientAuthenticator.java  | 259 ++++++++
 .../authenticator/SaslServerAuthenticator.java  | 203 +++++++
 .../SaslServerCallbackHandler.java              |  81 +++
 .../common/security/kerberos/KerberosName.java  | 114 ++++
 .../security/kerberos/KerberosNameParser.java   | 103 ++++
 .../common/security/kerberos/KerberosRule.java  | 189 ++++++
 .../kafka/common/security/kerberos/Login.java   | 389 ++++++++++++
 .../common/security/kerberos/LoginManager.java  | 118 ++++
 .../security/kerberos/NoMatchingRule.java       |  27 +
 .../kafka/common/security/ssl/SSLFactory.java   |   4 +-
 .../org/apache/kafka/common/utils/Shell.java    | 304 ++++++++++
 .../org/apache/kafka/common/utils/Time.java     |   6 +-
 .../apache/kafka/common/network/EchoServer.java |   2 +-
 .../kafka/common/network/SSLSelectorTest.java   |   7 +-
 .../common/network/SSLTransportLayerTest.java   |  14 +-
 .../security/kerberos/KerberosNameTest.java     |  59 ++
 .../common/security/ssl/SSLFactoryTest.java     |   9 +-
 .../org/apache/kafka/common/utils/MockTime.java |   4 +-
 .../org/apache/kafka/test/TestSSLUtils.java     |  10 +-
 .../controller/ControllerChannelManager.scala   |   5 +-
 .../main/scala/kafka/network/SocketServer.scala |   5 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  83 ++-
 .../main/scala/kafka/server/KafkaServer.scala   |  11 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   4 +-
 core/src/test/resources/kafka_jaas.conf         |  29 +
 .../kafka/api/BaseConsumerTest.scala            | 596 +++++++++++++++++++
 .../kafka/api/BaseProducerSendTest.scala        | 383 ++++++++++++
 .../integration/kafka/api/ConsumerTest.scala    | 594 ------------------
 .../kafka/api/IntegrationTestHarness.scala      |   9 +-
 .../kafka/api/PlaintextConsumerTest.scala       |  15 +
 .../kafka/api/PlaintextProducerSendTest.scala   |  54 ++
 .../kafka/api/ProducerSendTest.scala            | 416 -------------
 .../integration/kafka/api/SSLConsumerTest.scala | 231 -------
 .../kafka/api/SSLProducerSendTest.scala         | 240 --------
 .../kafka/api/SaslPlaintextConsumerTest.scala   |  19 +
 .../kafka/api/SaslSslConsumerTest.scala         |  22 +
 .../integration/kafka/api/SaslTestHarness.scala |  63 ++
 .../integration/kafka/api/SslConsumerTest.scala |  22 +
 .../kafka/api/SslProducerSendTest.scala         |  27 +
 .../integration/BaseTopicMetadataTest.scala     |  12 +-
 .../integration/KafkaServerTestHarness.scala    |   7 +-
 .../PlaintextTopicMetadataTest.scala            |   3 +
 .../SaslPlaintextTopicMetadataTest.scala        |  26 +
 .../integration/SaslSslTopicMetadataTest.scala  |  28 +
 .../integration/SslTopicMetadataTest.scala      |   5 +-
 .../unit/kafka/network/SocketServerTest.scala   |   3 +-
 .../kafka/server/BaseReplicaFetchTest.scala     |  10 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |   8 +
 .../server/PlaintextReplicaFetchTest.scala      |   3 +
 .../server/SaslPlaintextReplicaFetchTest.scala  |  26 +
 .../kafka/server/SaslSslReplicaFetchTest.scala  |  28 +
 .../unit/kafka/server/SslReplicaFetchTest.scala |   5 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala | 134 +++--
 76 files changed, 3809 insertions(+), 1654 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 2b0f66b..cea945e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -268,8 +268,8 @@ project(':core') {
     testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
     testCompile project(':clients')
     testCompile project(':clients').sourceSets.test.output
+    testCompile 'org.apache.hadoop:hadoop-minikdc:2.7.1'
     testRuntime "$slf4jlog4j"
-
     zinc 'com.typesafe.zinc:zinc:0.3.7'
   }
 
@@ -282,6 +282,9 @@ project(':core') {
     compile.exclude module: 'jmxtools'
     compile.exclude module: 'mail'
     compile.exclude module: 'netty'
+    // To prevent a UniqueResourceException due the same resource existing in both
+    // org.apache.directory.api/api-all and org.apache.directory.api/api-ldap-schema-data
+    testCompile.exclude module: 'api-ldap-schema-data'
   }
 
   tasks.create(name: "copyDependantLibs", type: Copy) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 289f1d0..6474865 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -29,15 +29,17 @@
   <allow pkg="org.junit" />
   <allow pkg="org.easymock" />
   <allow pkg="org.powermock" />
-
+  <allow pkg="java.security" />
   <allow pkg="javax.net.ssl" />
-  <allow pkg="javax.security.auth" />
+  <allow pkg="javax.security" />
+  <allow pkg="org.ietf.jgss" />
 
   <!-- no one depends on the server -->
   <disallow pkg="kafka" />
 
   <!-- anyone can use public classes -->
   <allow pkg="org.apache.kafka.common" exact-match="true" />
+  <allow pkg="org.apache.kafka.common.security" />
   <allow pkg="org.apache.kafka.common.utils" />
 
   <subpackage name="common">

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index e7514f8..41e5d74 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -20,9 +20,10 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.kafka.common.network.ChannelBuilders;
+import org.apache.kafka.common.network.LoginType;
+import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.network.ChannelBuilder;
-import org.apache.kafka.common.security.ssl.SSLFactory;
 import org.apache.kafka.common.config.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,9 +75,9 @@ public class ClientUtils {
      */
     public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
         SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
-        if (securityProtocol != SecurityProtocol.SSL && securityProtocol != SecurityProtocol.PLAINTEXT)
-            throw new ConfigException("Invalid SecurityProtocol " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
-        return ChannelBuilders.create(securityProtocol, SSLFactory.Mode.CLIENT, configs);
+        if (securityProtocol == SecurityProtocol.TRACE)
+            throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
+        return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 1042208..1894822 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -19,6 +19,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -302,6 +303,13 @@ public class ConsumerConfig extends AbstractConfig {
                                 .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
                                 .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
                                 .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+                                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
+                                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
+                                .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
+                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
+                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
+                                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+                                .define(SaslConfigs.AUTH_TO_LOCAL, Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
                                         Type.INT,
                                         40 * 1000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 4badd33..b3cfe70 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -279,6 +280,13 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
                                 .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
                                 .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+                                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
+                                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
+                                .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
+                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
+                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
+                                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+                                .define(SaslConfigs.AUTH_TO_LOCAL, Type.LIST, SaslConfigs.DEFAULT_AUTH_TO_LOCAL, Importance.MEDIUM, SaslConfigs.AUTH_TO_LOCAL_DOC)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
                                 .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
new file mode 100644
index 0000000..0abefe7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+import java.util.Collections;
+import java.util.List;
+
+public class SaslConfigs {
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    public static final String SASL_KAFKA_SERVER_REALM = "sasl.kafka.server.realm";
+    public static final String SASL_KAFKA_SERVER_DOC = "The sasl kafka server realm. "
+        + "Default will be from kafka jaas config";
+
+    public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
+    public static final String SASL_KERBEROS_SERVICE_NAME_DOC = "The Kerberos principal name that Kafka runs as. "
+            + "This can be defined either in the JAAS config or in the Kakfa config.";
+
+    public static final String SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
+    public static final String SASL_KERBEROS_KINIT_CMD_DOC = "Kerberos kinit command path. "
+        + "Default will be /usr/bin/kinit";
+    public static final String DEFAULT_KERBEROS_KINIT_CMD = "/usr/bin/kinit";
+
+    public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = "sasl.kerberos.ticket.renew.window.factor";
+    public static final String SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC = "LoginThread will sleep until specified window factor of time from last refresh"
+        + " to ticket's expiry has been reached, at which time it will wake and try to renew the ticket.";
+    public static final double DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR = 0.80;
+
+    public static final String SASL_KERBEROS_TICKET_RENEW_JITTER = "sasl.kerberos.ticket.renew.jitter";
+    public static final String SASL_KERBEROS_TICKET_RENEW_JITTER_DOC = "Percentage of random jitter added to the renewal time";
+    public static final double DEFAULT_KERBEROS_TICKET_RENEW_JITTER = 0.05;
+
+    public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN = "sasl.kerberos.min.time.before.relogin";
+    public static final String SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC = "LoginThread sleep time between refresh attempts";
+    public static final long DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN = 1 * 60 * 1000L;
+
+    public static final String AUTH_TO_LOCAL = "kafka.security.auth.to.local";
+    public static final String AUTH_TO_LOCAL_DOC = "Rules for the mapping between principal names and operating system user names";
+    public static final List<String> DEFAULT_AUTH_TO_LOCAL = Collections.singletonList("DEFAULT");
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
index 261f571..7f6eb8c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java
@@ -22,6 +22,7 @@ package org.apache.kafka.common.network;
  */
 
 import java.io.IOException;
+import java.util.Map;
 import java.security.Principal;
 
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
@@ -30,11 +31,13 @@ import org.apache.kafka.common.KafkaException;
 public interface Authenticator {
 
     /**
-     * configures Authenticator using principalbuilder and transportLayer.
-     * @param TransportLayer transportLayer
-     * @param PrincipalBuilder principalBuilder
+     * Configures Authenticator using the provided parameters.
+     *
+     * @param transportLayer The transport layer used to read or write tokens
+     * @param principalBuilder The builder used to construct `Principal`
+     * @param configs Additional configuration parameters as key/value pairs
      */
-    void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder);
+    void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs);
 
     /**
      * Implements any authentication mechanism. Use transportLayer to read or write tokens.

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
index 52a7aab..0b7c328 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -32,6 +32,8 @@ public interface ChannelBuilder {
      * returns a Channel with TransportLayer and Authenticator configured.
      * @param  id  channel id
      * @param  key SelectionKey
+     * @param  maxReceiveSize
+     * @return KafkaChannel
      */
     KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 2332d3f..1e5d840 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -14,7 +14,6 @@
 package org.apache.kafka.common.network;
 
 import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.security.ssl.SSLFactory;
 
 import java.util.Map;
 
@@ -24,20 +23,28 @@ public class ChannelBuilders {
 
     /**
      * @param securityProtocol the securityProtocol
-     * @param mode the SSL mode, it must be non-null if `securityProcol` is `SSL` and it is ignored otherwise
+     * @param mode the mode, it must be non-null if `securityProtocol` is not `PLAINTEXT`;
+     *             it is ignored otherwise
+     * @param loginType the loginType, it must be non-null if `securityProtocol` is SASL_*; it is ignored otherwise
      * @param configs client/server configs
      * @return the configured `ChannelBuilder`
      * @throws IllegalArgumentException if `mode` invariants described above is not maintained
      */
-    public static ChannelBuilder create(SecurityProtocol securityProtocol, SSLFactory.Mode mode, Map<String, ?> configs) {
-        ChannelBuilder channelBuilder = null;
+    public static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, LoginType loginType, Map<String, ?> configs) {
+        ChannelBuilder channelBuilder;
 
         switch (securityProtocol) {
             case SSL:
-                if (mode == null)
-                    throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `SSL`");
+                requireNonNullMode(mode, securityProtocol);
                 channelBuilder = new SSLChannelBuilder(mode);
                 break;
+            case SASL_SSL:
+            case SASL_PLAINTEXT:
+                requireNonNullMode(mode, securityProtocol);
+                if (loginType == null)
+                    throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
+                channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol);
+                break;
             case PLAINTEXT:
             case TRACE:
                 channelBuilder = new PlaintextChannelBuilder();
@@ -49,4 +56,10 @@ public class ChannelBuilders {
         channelBuilder.configure(configs);
         return channelBuilder;
     }
+
+    private static void requireNonNullMode(Mode mode, SecurityProtocol securityProtocol) {
+        if (mode == null)
+            throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
index 813a4aa..650ad41 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.network;
 
 import java.security.Principal;
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.KafkaException;
@@ -29,7 +30,7 @@ public class DefaultAuthenticator implements Authenticator {
     private PrincipalBuilder principalBuilder;
     private Principal principal;
 
-    public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder) {
+    public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) {
         this.transportLayer = transportLayer;
         this.principalBuilder = principalBuilder;
     }
@@ -54,7 +55,7 @@ public class DefaultAuthenticator implements Authenticator {
 
     /**
      * DefaultAuthenticator doesn't implement any additional authentication mechanism.
-     * @returns true
+     * @return true
      */
     public boolean complete() {
         return true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 172f4cd..ac436c3 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -61,11 +61,9 @@ public class KafkaChannel {
     }
 
     /**
-     * Does handshake of transportLayer and Authentication using configured authenticator
+     * Does handshake of transportLayer and authentication using configured authenticator
      */
     public void prepare() throws IOException {
-        if (transportLayer.ready() && authenticator.complete())
-            return;
         if (!transportLayer.ready())
             transportLayer.handshake();
         if (transportLayer.ready() && !authenticator.complete())

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/LoginType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/LoginType.java b/clients/src/main/java/org/apache/kafka/common/network/LoginType.java
new file mode 100644
index 0000000..9216cb0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/LoginType.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.security.JaasUtils;
+
+/**
+ * The type of the login context, it should be SERVER for the broker and CLIENT for the clients (i.e. consumer and
+ * producer). It provides the the login context name which defines the section of the JAAS configuration file to be used
+ * for login.
+ */
+public enum LoginType {
+    CLIENT(JaasUtils.LOGIN_CONTEXT_CLIENT),
+    SERVER(JaasUtils.LOGIN_CONTEXT_SERVER);
+
+    private final String contextName;
+
+    LoginType(String contextName) {
+        this.contextName = contextName;
+    }
+
+    public String contextName() {
+        return contextName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/Mode.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Mode.java b/clients/src/main/java/org/apache/kafka/common/network/Mode.java
new file mode 100644
index 0000000..67de44d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/Mode.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+public enum Mode { CLIENT, SERVER };

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index 76dbf93..a028159 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -27,11 +27,13 @@ import org.slf4j.LoggerFactory;
 public class PlaintextChannelBuilder implements ChannelBuilder {
     private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class);
     private PrincipalBuilder principalBuilder;
+    private Map<String, ?> configs;
 
     public void configure(Map<String, ?> configs) throws KafkaException {
         try {
+            this.configs = configs;
             this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
-            this.principalBuilder.configure(configs);
+            this.principalBuilder.configure(this.configs);
         } catch (Exception e) {
             throw new KafkaException(e);
         }
@@ -42,7 +44,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
         try {
             PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
             Authenticator authenticator = new DefaultAuthenticator();
-            authenticator.configure(transportLayer, this.principalBuilder);
+            authenticator.configure(transportLayer, this.principalBuilder, this.configs);
             channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
         } catch (Exception e) {
             log.warn("Failed to create channel due to ", e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index 1149c99..8949e5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -52,10 +52,7 @@ public class PlaintextTransportLayer implements TransportLayer {
     @Override
     public void finishConnect() throws IOException {
         socketChannel.finishConnect();
-        int ops = key.interestOps();
-        ops &= ~SelectionKey.OP_CONNECT;
-        ops |= SelectionKey.OP_READ;
-        key.interestOps(ops);
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
index e2cce5c..1dd1ecd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java
@@ -29,18 +29,20 @@ public class SSLChannelBuilder implements ChannelBuilder {
     private static final Logger log = LoggerFactory.getLogger(SSLChannelBuilder.class);
     private SSLFactory sslFactory;
     private PrincipalBuilder principalBuilder;
-    private SSLFactory.Mode mode;
+    private Mode mode;
+    private Map<String, ?> configs;
 
-    public SSLChannelBuilder(SSLFactory.Mode mode) {
+    public SSLChannelBuilder(Mode mode) {
         this.mode = mode;
     }
 
     public void configure(Map<String, ?> configs) throws KafkaException {
         try {
+            this.configs = configs;
             this.sslFactory = new SSLFactory(mode);
-            this.sslFactory.configure(configs);
+            this.sslFactory.configure(this.configs);
             this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
-            this.principalBuilder.configure(configs);
+            this.principalBuilder.configure(this.configs);
         } catch (Exception e) {
             throw new KafkaException(e);
         }
@@ -51,7 +53,7 @@ public class SSLChannelBuilder implements ChannelBuilder {
         try {
             SSLTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key);
             Authenticator authenticator = new DefaultAuthenticator();
-            authenticator.configure(transportLayer, this.principalBuilder);
+            authenticator.configure(transportLayer, this.principalBuilder, this.configs);
             channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
         } catch (Exception e) {
             log.info("Failed to create channel due to ", e);
@@ -66,10 +68,8 @@ public class SSLChannelBuilder implements ChannelBuilder {
 
     protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException {
         SocketChannel socketChannel = (SocketChannel) key.channel();
-        SSLTransportLayer transportLayer = new SSLTransportLayer(id, key,
-                sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
-                                           socketChannel.socket().getPort()));
-        transportLayer.startHandshake();
-        return transportLayer;
+        return SSLTransportLayer.create(id, key,
+            sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
+            socketChannel.socket().getPort()));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
index 35ea9aa..e7afa02 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
@@ -58,7 +58,14 @@ public class SSLTransportLayer implements TransportLayer {
     private ByteBuffer appReadBuffer;
     private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
 
-    public SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
+    public static SSLTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
+        SSLTransportLayer transportLayer = new SSLTransportLayer(channelId, key, sslEngine);
+        transportLayer.startHandshake();
+        return transportLayer;
+    }
+
+    // Prefer `create`, only use this in tests
+    SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
         this.channelId = channelId;
         this.key = key;
         this.socketChannel = (SocketChannel) key.channel();

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
new file mode 100644
index 0000000..53953c5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.kerberos.KerberosNameParser;
+import org.apache.kafka.common.security.kerberos.LoginManager;
+import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
+import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.config.SSLConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.KafkaException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SaslChannelBuilder implements ChannelBuilder {
+    private static final Logger log = LoggerFactory.getLogger(SaslChannelBuilder.class);
+
+    private final SecurityProtocol securityProtocol;
+    private final Mode mode;
+    private final LoginType loginType;
+
+    private LoginManager loginManager;
+    private PrincipalBuilder principalBuilder;
+    private SSLFactory sslFactory;
+    private Map<String, ?> configs;
+    private KerberosNameParser kerberosNameParser;
+
+    public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol) {
+        this.mode = mode;
+        this.loginType = loginType;
+        this.securityProtocol = securityProtocol;
+    }
+
+    public void configure(Map<String, ?> configs) throws KafkaException {
+        try {
+            this.configs = configs;
+            this.loginManager = LoginManager.acquireLoginManager(loginType, configs);
+            this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
+            this.principalBuilder.configure(configs);
+
+            String defaultRealm;
+            try {
+                defaultRealm = JaasUtils.defaultRealm();
+            } catch (Exception ke) {
+                defaultRealm = "";
+            }
+            kerberosNameParser = new KerberosNameParser(defaultRealm, (List<String>) configs.get(SaslConfigs.AUTH_TO_LOCAL));
+
+            if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
+                this.sslFactory = new SSLFactory(mode);
+                this.sslFactory.configure(this.configs);
+            }
+        } catch (Exception e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+        try {
+            SocketChannel socketChannel = (SocketChannel) key.channel();
+            TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
+            Authenticator authenticator;
+            if (mode == Mode.SERVER)
+                authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosNameParser);
+            else
+                authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),
+                        socketChannel.socket().getInetAddress().getHostName(), kerberosNameParser);
+            authenticator.configure(transportLayer, this.principalBuilder, this.configs);
+            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+        } catch (Exception e) {
+            log.info("Failed to create channel due to ", e);
+            throw new KafkaException(e);
+        }
+    }
+
+    public void close()  {
+        this.principalBuilder.close();
+        this.loginManager.release();
+    }
+
+    protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
+        if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
+            return SSLTransportLayer.create(id, key,
+                sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
+                socketChannel.socket().getPort()));
+        } else {
+            return new PlaintextTransportLayer(key);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index e1e5b4a..34de616 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -200,6 +200,7 @@ public class Selector implements Selectable {
             log.error("Exception closing nioSelector:", se);
         }
         sensors.close();
+        channelBuilder.close();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
index ff7a3bf..591fb8d 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -74,8 +74,7 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
     boolean hasPendingWrites();
 
     /**
-     * returns SSLSession.getPeerPrinicpal if SSLTransportLayer used
-     * for non-secure returns a "ANONYMOUS" as the peerPrincipal
+     * Returns `SSLSession.getPeerPrincipal` if SSLTransportLayer is used and `KakfaPrincipal.ANONYMOUS` otherwise.
      */
     Principal peerPrincipal() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
index a624741..70130a4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
@@ -26,6 +26,10 @@ public enum SecurityProtocol {
     PLAINTEXT(0, "PLAINTEXT"),
     /** SSL channel */
     SSL(1, "SSL"),
+    /** SASL authenticated, non-encrypted channel */
+    SASL_PLAINTEXT(2, "SASL_PLAINTEXT"),
+    /** SASL authenticated, SSL channel */
+    SASL_SSL(3, "SASL_SSL"),
     /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
     TRACE(Short.MAX_VALUE, "TRACE");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
index ce0be62..b8c870d 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
@@ -14,26 +14,77 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.common.security;
 
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.AppConfigurationEntry;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.io.IOException;
 import java.io.File;
 import java.net.URI;
 import java.security.URIParameter;
-import javax.security.auth.login.Configuration;
+
 import org.apache.kafka.common.KafkaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class JaasUtils {
     private static final Logger LOG = LoggerFactory.getLogger(JaasUtils.class);
+    public static final String JAVA_LOGIN_CONFIG_PARAM = "java.security.auth.login.config";
+
     public static final String LOGIN_CONTEXT_SERVER = "KafkaServer";
     public static final String LOGIN_CONTEXT_CLIENT = "KafkaClient";
     public static final String SERVICE_NAME = "serviceName";
-    public static final String JAVA_LOGIN_CONFIG_PARAM = "java.security.auth.login.config";
+
     public static final String ZK_SASL_CLIENT = "zookeeper.sasl.client";
     public static final String ZK_LOGIN_CONTEXT_NAME_KEY = "zookeeper.sasl.clientconfig";
 
+    /**
+     * Construct a JAAS configuration object per kafka jaas configuration file
+     * @param loginContextName
+     * @param key
+     * @return JAAS configuration object
+     */
+    public static String jaasConfig(String loginContextName, String key) throws IOException {
+        AppConfigurationEntry[] configurationEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+        if (configurationEntries == null) {
+            String errorMessage = "Could not find a '" + loginContextName + "' entry in this configuration.";
+            throw new IOException(errorMessage);
+        }
+
+        for (AppConfigurationEntry entry: configurationEntries) {
+            Object val = entry.getOptions().get(key);
+            if (val != null)
+                return (String) val;
+        }
+        return null;
+    }
+
+    public static String defaultRealm()
+        throws ClassNotFoundException, NoSuchMethodException,
+               IllegalArgumentException, IllegalAccessException,
+               InvocationTargetException {
+
+        //TODO Find a way to avoid using these proprietary classes as access to Java 9 will block access by default
+        //due to the Jigsaw module system
+
+        Object kerbConf;
+        Class<?> classRef;
+        Method getInstanceMethod;
+        Method getDefaultRealmMethod;
+        if (System.getProperty("java.vendor").contains("IBM")) {
+            classRef = Class.forName("com.ibm.security.krb5.internal.Config");
+        } else {
+            classRef = Class.forName("sun.security.krb5.Config");
+        }
+        getInstanceMethod = classRef.getMethod("getInstance", new Class[0]);
+        kerbConf = getInstanceMethod.invoke(classRef, new Object[0]);
+        getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm",
+                                                           new Class[0]);
+        return (String) getDefaultRealmMethod.invoke(kerbConf, new Object[0]);
+    }
+
     public static boolean isZkSecurityEnabled(String loginConfigFile) {
         boolean isSecurityEnabled = false;
         boolean zkSaslEnabled = Boolean.getBoolean(System.getProperty(ZK_SASL_CLIENT, "true"));
@@ -61,4 +112,5 @@ public class JaasUtils {
 
         return isSecurityEnabled;
     }
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
index b7cc378..99b6d21 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
@@ -17,10 +17,6 @@
 
 package org.apache.kafka.common.security.auth;
 
-/*
- * PrincipalBuilder for Authenticator
- */
-
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.KafkaException;
@@ -29,23 +25,24 @@ import org.apache.kafka.common.Configurable;
 import java.util.Map;
 import java.security.Principal;
 
+/*
+ * PrincipalBuilder for Authenticator
+ */
 public interface PrincipalBuilder extends Configurable {
 
     /**
-     * configure this class with give key-value pair
+     * Configures this class with given key-value pairs.
      */
-    public void configure(Map<String, ?> configs);
+    void configure(Map<String, ?> configs);
 
     /**
-     * Returns Principal
-     * @param TransportLayer
-     * @param Authenticator
+     * Returns Principal.
      */
     Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException;
 
     /**
-     * Close this PrincipalBuilder
+     * Closes this instance.
      */
-    public void close() throws KafkaException;
+    void close() throws KafkaException;
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
new file mode 100644
index 0000000..3929160
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.authenticator;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.Arrays;
+import java.util.Map;
+
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.KafkaException;
+
+import org.apache.kafka.common.security.kerberos.KerberosNameParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SaslClientAuthenticator implements Authenticator {
+
+    public enum SaslState {
+        INITIAL, INTERMEDIATE, COMPLETE, FAILED
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(SaslClientAuthenticator.class);
+
+    private final Subject subject;
+    private final String servicePrincipal;
+    private final String host;
+    private final String node;
+    private final KerberosNameParser kerberosNameParser;
+
+    // assigned in `configure`
+    private SaslClient saslClient;
+    private String clientPrincipalName;
+    private TransportLayer transportLayer;
+
+    // buffers used in `authenticate`
+    private NetworkReceive netInBuffer;
+    private NetworkSend netOutBuffer;
+
+    private SaslState saslState = SaslState.INITIAL;
+
+    public SaslClientAuthenticator(String node, Subject subject, String servicePrincipal, String host, KerberosNameParser kerberosNameParser) throws IOException {
+        this.node = node;
+        this.subject = subject;
+        this.host = host;
+        this.servicePrincipal = servicePrincipal;
+        this.kerberosNameParser = kerberosNameParser;
+    }
+
+    public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) throws KafkaException {
+        try {
+            this.transportLayer = transportLayer;
+
+            // determine client principal from subject.
+            Principal clientPrincipal = subject.getPrincipals().iterator().next();
+            this.clientPrincipalName = kerberosNameParser.parse(clientPrincipal.getName()).toString();
+            this.saslClient = createSaslClient();
+        } catch (Exception e) {
+            throw new KafkaException("Failed to configure SaslClientAuthenticator", e);
+        }
+    }
+
+    private SaslClient createSaslClient() {
+        try {
+            return Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
+                public SaslClient run() throws SaslException {
+                    String[] mechs = {"GSSAPI"};
+                    LOG.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}",
+                        clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs));
+                    return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, null,
+                            new ClientCallbackHandler());
+                }
+            });
+        } catch (Exception e) {
+            throw new KafkaException("Failed to create SASL client", e);
+        }
+    }
+
+    /**
+     * Sends an empty message to the server to initiate the authentication process. It then evaluates server challenges
+     * via `SaslClient.evaluateChallenge` and returns client responses until authentication succeeds or fails.
+     *
+     * The messages are sent and received as size delimited bytes that consists of a 4 byte network-ordered size N
+     * followed by N bytes representing the opaque payload.
+     */
+    public void authenticate() throws IOException {
+        if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
+            return;
+
+        switch (saslState) {
+            case INITIAL:
+                sendSaslToken(new byte[0]);
+                saslState = SaslState.INTERMEDIATE;
+                break;
+            case INTERMEDIATE:
+                if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
+                netInBuffer.readFrom(transportLayer);
+                if (netInBuffer.complete()) {
+                    netInBuffer.payload().rewind();
+                    byte[] serverToken = new byte[netInBuffer.payload().remaining()];
+                    netInBuffer.payload().get(serverToken, 0, serverToken.length);
+                    netInBuffer = null; // reset the networkReceive as we read all the data.
+                    sendSaslToken(serverToken);
+                }
+                if (saslClient.isComplete()) {
+                    saslState = SaslState.COMPLETE;
+                    transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+                }
+                break;
+            case COMPLETE:
+                break;
+            case FAILED:
+                throw new IOException("SASL handshake failed");
+        }
+    }
+
+    private void sendSaslToken(byte[] serverToken) throws IOException {
+        if (!saslClient.isComplete()) {
+            try {
+                byte[] saslToken = createSaslToken(serverToken);
+                if (saslToken != null) {
+                    netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(saslToken));
+                    flushNetOutBufferAndUpdateInterestOps();
+                }
+            } catch (SaslException se) {
+                saslState = SaslState.FAILED;
+                throw new IOException("Failed to authenticate using SASL " + se);
+            }
+        }
+    }
+
+    private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
+        boolean flushedCompletely = flushNetOutBuffer();
+        if (flushedCompletely)
+            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+        else
+            transportLayer.addInterestOps(SelectionKey.OP_WRITE);
+        return flushedCompletely;
+    }
+
+    public Principal principal() {
+        return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, clientPrincipalName);
+    }
+
+    public boolean complete() {
+        return saslState == SaslState.COMPLETE;
+    }
+
+    public void close() throws IOException {
+        saslClient.dispose();
+    }
+
+    private byte[] createSaslToken(final byte[] saslToken) throws SaslException {
+        if (saslToken == null) {
+            throw new SaslException("Error in authenticating with a Kafka Broker: the kafka broker saslToken is null.");
+        }
+
+        try {
+            return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+                public byte[] run() throws SaslException {
+                    return saslClient.evaluateChallenge(saslToken);
+                }
+            });
+        } catch (PrivilegedActionException e) {
+            String error = "An error: (" + e + ") occurred when evaluating Kafka Brokers " +
+                      " received SASL token.";
+            // Try to provide hints to use about what went wrong so they can fix their configuration.
+            // TODO: introspect about e: look for GSS information.
+            final String unknownServerErrorText =
+                "(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)";
+            if (e.toString().indexOf(unknownServerErrorText) > -1) {
+                error += " This may be caused by Java's being unable to resolve the Kafka Broker's" +
+                    " hostname correctly. You may want to try to adding" +
+                    " '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment." +
+                    " Users must configure FQDN of kafka brokers when authenticating using SASL and" +
+                    " `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm`";
+            }
+            error += " Kafka Client will go to AUTH_FAILED state.";
+            throw new SaslException(error);
+        }
+    }
+
+    private boolean flushNetOutBuffer() throws IOException {
+        if (!netOutBuffer.completed()) {
+            netOutBuffer.writeTo(transportLayer);
+        }
+        return netOutBuffer.completed();
+    }
+
+    public static class ClientCallbackHandler implements CallbackHandler {
+
+        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof NameCallback) {
+                    NameCallback nc = (NameCallback) callback;
+                    nc.setName(nc.getDefaultName());
+                } else if (callback instanceof PasswordCallback) {
+                    // Call `setPassword` once we support obtaining a password from the user and update message below
+                    LOG.warn("Could not login: the client is being asked for a password, but the Kafka" +
+                             " client code does not currently support obtaining a password from the user." +
+                             " Make sure -Djava.security.auth.login.config property passed to JVM and" +
+                             " the client is configured to use a ticket cache (using" +
+                             " the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using" +
+                             " FQDN of the Kafka broker you are trying to connect to.");
+                } else if (callback instanceof RealmCallback) {
+                    RealmCallback rc = (RealmCallback) callback;
+                    rc.setText(rc.getDefaultText());
+                } else if (callback instanceof AuthorizeCallback) {
+                    AuthorizeCallback ac = (AuthorizeCallback) callback;
+                    String authId = ac.getAuthenticationID();
+                    String authzId = ac.getAuthorizationID();
+                    ac.setAuthorized(authId.equals(authzId));
+                    if (ac.isAuthorized())
+                        ac.setAuthorizedID(authzId);
+                } else {
+                    throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
new file mode 100644
index 0000000..d06a22a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.authenticator;
+
+import java.io.IOException;
+import java.util.Map;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.login.Configuration;
+import javax.security.auth.Subject;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.kerberos.KerberosName;
+import org.apache.kafka.common.security.kerberos.KerberosNameParser;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+
+public class SaslServerAuthenticator implements Authenticator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
+
+    private final SaslServer saslServer;
+    private final Subject subject;
+    private final String node;
+    private final KerberosNameParser kerberosNameParser;
+
+    // assigned in `configure`
+    private TransportLayer transportLayer;
+
+    // buffers used in `authenticate`
+    private NetworkReceive netInBuffer;
+    private NetworkSend netOutBuffer;
+
+    public SaslServerAuthenticator(String node, final Subject subject, KerberosNameParser kerberosNameParser) throws IOException {
+        if (subject == null)
+            throw new IllegalArgumentException("subject cannot be null");
+        if (subject.getPrincipals().isEmpty())
+            throw new IllegalArgumentException("subject must have at least one principal");
+        this.node = node;
+        this.subject = subject;
+        this.kerberosNameParser = kerberosNameParser;
+        saslServer = createSaslServer();
+    }
+
+    public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) {
+        this.transportLayer = transportLayer;
+    }
+
+    private SaslServer createSaslServer() throws IOException {
+        // server is using a JAAS-authenticated subject: determine service principal name and hostname from kafka server's subject.
+        final SaslServerCallbackHandler saslServerCallbackHandler = new SaslServerCallbackHandler(
+                Configuration.getConfiguration(), kerberosNameParser);
+        final Principal servicePrincipal = subject.getPrincipals().iterator().next();
+        KerberosName kerberosName;
+        try {
+            kerberosName = kerberosNameParser.parse(servicePrincipal.getName());
+        } catch (IllegalArgumentException e) {
+            throw new KafkaException("Principal has name with unexpected format " + servicePrincipal);
+        }
+        final String servicePrincipalName = kerberosName.serviceName();
+        final String serviceHostname = kerberosName.hostName();
+
+        final String mech = "GSSAPI";
+
+        LOG.debug("Creating SaslServer for {} with mechanism {}", kerberosName, mech);
+
+        // As described in http://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/jgss-features.html:
+        // "To enable Java GSS to delegate to the native GSS library and its list of native mechanisms,
+        // set the system property "sun.security.jgss.native" to true"
+        // "In addition, when performing operations as a particular Subject, for example, Subject.doAs(...)
+        // or Subject.doAsPrivileged(...), the to-be-used GSSCredential should be added to Subject's
+        // private credential set. Otherwise, the GSS operations will fail since no credential is found."
+        boolean usingNativeJgss = Boolean.getBoolean("sun.security.jgss.native");
+        if (usingNativeJgss) {
+            try {
+                GSSManager manager = GSSManager.getInstance();
+                // This Oid is used to represent the Kerberos version 5 GSS-API mechanism. It is defined in
+                // RFC 1964.
+                Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
+                GSSName gssName = manager.createName(servicePrincipalName + "@" + serviceHostname, GSSName.NT_HOSTBASED_SERVICE);
+                GSSCredential cred = manager.createCredential(gssName, GSSContext.INDEFINITE_LIFETIME, krb5Mechanism, GSSCredential.ACCEPT_ONLY);
+                subject.getPrivateCredentials().add(cred);
+            } catch (GSSException ex) {
+                LOG.warn("Cannot add private credential to subject; clients authentication may fail", ex);
+            }
+        }
+
+        try {
+            return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
+                public SaslServer run() {
+                    try {
+                        return Sasl.createSaslServer(mech, servicePrincipalName, serviceHostname, null, saslServerCallbackHandler);
+                    } catch (SaslException e) {
+                        throw new KafkaException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e);
+                    }
+                }
+            });
+        } catch (PrivilegedActionException e) {
+            throw new KafkaException("Kafka Broker experienced a PrivilegedActionException exception while creating a SaslServer using a JAAS principal context", e);
+        }
+    }
+
+    /**
+     * Evaluates client responses via `SaslServer.evaluateResponse` and returns the issued challenge to the client until
+     * authentication succeeds or fails.
+     *
+     * The messages are sent and received as size delimited bytes that consists of a 4 byte network-ordered size N
+     * followed by N bytes representing the opaque payload.
+     */
+    public void authenticate() throws IOException {
+        if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
+            return;
+
+        if (saslServer.isComplete()) {
+            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+            return;
+        }
+
+        if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
+
+        netInBuffer.readFrom(transportLayer);
+
+        if (netInBuffer.complete()) {
+            netInBuffer.payload().rewind();
+            byte[] clientToken = new byte[netInBuffer.payload().remaining()];
+            netInBuffer.payload().get(clientToken, 0, clientToken.length);
+            netInBuffer = null; // reset the networkReceive as we read all the data.
+            try {
+                byte[] response = saslServer.evaluateResponse(clientToken);
+                if (response != null) {
+                    netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(response));
+                    flushNetOutBufferAndUpdateInterestOps();
+                }
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    public Principal principal() {
+        return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
+    }
+
+    public boolean complete() {
+        return saslServer.isComplete();
+    }
+
+    public void close() throws IOException {
+        saslServer.dispose();
+    }
+
+    private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
+        boolean flushedCompletely = flushNetOutBuffer();
+        if (flushedCompletely)
+            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+        else
+            transportLayer.addInterestOps(SelectionKey.OP_WRITE);
+        return flushedCompletely;
+    }
+
+    private boolean flushNetOutBuffer() throws IOException {
+        if (!netOutBuffer.completed())
+            netOutBuffer.writeTo(transportLayer);
+        return netOutBuffer.completed();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
new file mode 100644
index 0000000..8474faf
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.authenticator;
+
+import java.io.IOException;
+
+import org.apache.kafka.common.security.kerberos.KerberosNameParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.apache.kafka.common.security.kerberos.KerberosName;
+import org.apache.kafka.common.security.JaasUtils;
+
+public class SaslServerCallbackHandler implements CallbackHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class);
+    private final KerberosNameParser kerberosNameParser;
+
+    public SaslServerCallbackHandler(Configuration configuration, KerberosNameParser kerberosNameParser) throws IOException {
+        AppConfigurationEntry[] configurationEntries = configuration.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_SERVER);
+        if (configurationEntries == null)
+            throw new IOException("Could not find a 'KafkaServer' entry in this configuration: Kafka Server cannot start.");
+        this.kerberosNameParser = kerberosNameParser;
+    }
+
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+        for (Callback callback : callbacks) {
+            if (callback instanceof RealmCallback) {
+                handleRealmCallback((RealmCallback) callback);
+            } else if (callback instanceof AuthorizeCallback) {
+                handleAuthorizeCallback((AuthorizeCallback) callback);
+            }
+        }
+    }
+
+    private void handleRealmCallback(RealmCallback rc) {
+        LOG.trace("Client supplied realm: {} ", rc.getDefaultText());
+        rc.setText(rc.getDefaultText());
+    }
+
+    private void handleAuthorizeCallback(AuthorizeCallback ac) {
+        String authenticationID = ac.getAuthenticationID();
+        String authorizationID = ac.getAuthorizationID();
+
+        LOG.info("Successfully authenticated client: authenticationID={}; authorizationID={}.", authenticationID,
+                authorizationID);
+        ac.setAuthorized(true);
+
+        KerberosName kerberosName = kerberosNameParser.parse(authenticationID);
+        try {
+            String userName = kerberosName.shortName();
+            LOG.info("Setting authorizedID: {}", userName);
+            ac.setAuthorizedID(userName);
+        } catch (IOException e) {
+            LOG.error("Failed to set name based on Kerberos authentication rules.");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
new file mode 100644
index 0000000..aef10db
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosName.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.kerberos;
+
+import java.io.IOException;
+import java.util.List;
+
+public class KerberosName {
+
+    /** The first component of the name */
+    private final String serviceName;
+    /** The second component of the name. It may be null. */
+    private final String hostName;
+    /** The realm of the name. */
+    private final String realm;
+
+    /* Rules for the translation of the principal name into an operating system name */
+    private final List<KerberosRule> authToLocalRules;
+
+    /**
+     * Creates an instance of `KerberosName` with the provided parameters.
+     */
+    public KerberosName(String serviceName, String hostName, String realm, List<KerberosRule> authToLocalRules) {
+        if (serviceName == null)
+            throw new IllegalArgumentException("serviceName must not be null");
+        this.serviceName = serviceName;
+        this.hostName = hostName;
+        this.realm = realm;
+        this.authToLocalRules = authToLocalRules;
+    }
+
+    /**
+     * Put the name back together from the parts.
+     */
+    @Override
+    public String toString() {
+        StringBuilder result = new StringBuilder();
+        result.append(serviceName);
+        if (hostName != null) {
+            result.append('/');
+            result.append(hostName);
+        }
+        if (realm != null) {
+            result.append('@');
+            result.append(realm);
+        }
+        return result.toString();
+    }
+
+    /**
+     * Get the first component of the name.
+     * @return the first section of the Kerberos principal name
+     */
+    public String serviceName() {
+        return serviceName;
+    }
+
+    /**
+     * Get the second component of the name.
+     * @return the second section of the Kerberos principal name, and may be null
+     */
+    public String hostName() {
+        return hostName;
+    }
+
+    /**
+     * Get the realm of the name.
+     * @return the realm of the name, may be null
+     */
+    public String realm() {
+        return realm;
+    }
+
+    /**
+     * Get the translation of the principal name into an operating system
+     * user name.
+     * @return the short name
+     * @throws IOException
+     */
+    public String shortName() throws IOException {
+        String[] params;
+        if (hostName == null) {
+            // if it is already simple, just return it
+            if (realm == null)
+                return serviceName;
+            params = new String[]{realm, serviceName};
+        } else {
+            params = new String[]{realm, serviceName, hostName};
+        }
+        for (KerberosRule r : authToLocalRules) {
+            String result = r.apply(params);
+            if (result != null)
+                return result;
+        }
+        throw new NoMatchingRule("No rules applied to " + toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
new file mode 100644
index 0000000..95eb170
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosNameParser.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.kerberos;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This class implements parsing and handling of Kerberos principal names. In
+ * particular, it splits them apart and translates them down into local
+ * operating system names.
+ */
+public class KerberosNameParser {
+
+    /**
+     * A pattern that matches a Kerberos name with at most 3 components.
+     */
+    private static final Pattern NAME_PARSER = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
+
+    /**
+     * A pattern for parsing a auth_to_local rule.
+     */
+    private static final Pattern RULE_PARSER = Pattern.compile("((DEFAULT)|(RULE:\\[(\\d*):([^\\]]*)](\\(([^)]*)\\))?(s/([^/]*)/([^/]*)/(g)?)?))");
+
+    /**
+     * The list of translation rules.
+     */
+    private final List<KerberosRule> authToLocalRules;
+
+    public KerberosNameParser(String defaultRealm, List<String> authToLocalRules) {
+        this.authToLocalRules = parseRules(defaultRealm, authToLocalRules);
+    }
+
+    /**
+     * Create a name from the full Kerberos principal name.
+     */
+    public KerberosName parse(String principalName) {
+        Matcher match = NAME_PARSER.matcher(principalName);
+        if (!match.matches()) {
+            if (principalName.contains("@")) {
+                throw new IllegalArgumentException("Malformed Kerberos name: " + principalName);
+            } else {
+                return new KerberosName(principalName, null, null, authToLocalRules);
+            }
+        } else {
+            return new KerberosName(match.group(1), match.group(3), match.group(4), authToLocalRules);
+        }
+    }
+
+    private static List<KerberosRule> parseRules(String defaultRealm, List<String> rules) {
+        List<KerberosRule> result = new ArrayList<>();
+        for (String rule : rules) {
+            Matcher matcher = RULE_PARSER.matcher(rule);
+            if (!matcher.lookingAt()) {
+                throw new IllegalArgumentException("Invalid rule: " + rule);
+            }
+            if (rule.length() != matcher.end())
+                throw new IllegalArgumentException("Invalid rule: `" + rule + "`, unmatched substring: `" + rule.substring(matcher.end()) + "`");
+            if (matcher.group(2) != null) {
+                result.add(new KerberosRule(defaultRealm));
+            } else {
+                result.add(new KerberosRule(defaultRealm,
+                        Integer.parseInt(matcher.group(4)),
+                        matcher.group(5),
+                        matcher.group(7),
+                        matcher.group(9),
+                        matcher.group(10),
+                        "g".equals(matcher.group(11))));
+
+            }
+        }
+        return result;
+    }
+
+    public static class BadFormatString extends IOException {
+        BadFormatString(String msg) {
+            super(msg);
+        }
+        BadFormatString(String msg, Throwable err) {
+            super(msg, err);
+        }
+    }
+
+}