You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:38 UTC

[24/50] [abbrv] kafka git commit: KAFKA-3149; Extend SASL implementation to support more mechanisms

KAFKA-3149; Extend SASL implementation to support more mechanisms

Code changes corresponding to KIP-43 to enable review of the KIP.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Jun Rao <ju...@apache.org>, Ismael Juma <is...@juma.me.uk>

Closes #812 from rajinisivaram/KAFKA-3149


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b375d7b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b375d7b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b375d7b

Branch: refs/heads/0.10.0
Commit: 5b375d7bf9b26aaeed06bac2dc5de3f8214cbad4
Parents: cd427c9
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Tue Apr 26 16:56:42 2016 -0700
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Apr 26 16:56:42 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   7 +
 .../org/apache/kafka/clients/ClientUtils.java   |   4 +-
 .../org/apache/kafka/clients/NetworkClient.java |  20 +-
 .../apache/kafka/common/config/SaslConfigs.java |  15 +-
 .../common/errors/AuthenticationException.java  |  27 ++
 .../errors/IllegalSaslStateException.java       |  27 ++
 .../UnsupportedSaslMechanismException.java      |  27 ++
 .../kafka/common/network/ChannelBuilders.java   |  14 +-
 .../common/network/SaslChannelBuilder.java      |  36 +-
 .../apache/kafka/common/protocol/ApiKeys.java   |   3 +-
 .../apache/kafka/common/protocol/Errors.java    |   8 +-
 .../apache/kafka/common/protocol/Protocol.java  |  13 +
 .../kafka/common/requests/AbstractRequest.java  |   2 +
 .../kafka/common/requests/ResponseSend.java     |   2 +-
 .../common/requests/SaslHandshakeRequest.java   |  83 ++++
 .../common/requests/SaslHandshakeResponse.java  |  85 ++++
 .../security/auth/AuthCallbackHandler.java      |  46 +++
 .../kafka/common/security/auth/Login.java       |  57 +++
 .../security/authenticator/AbstractLogin.java   | 108 +++++
 .../security/authenticator/DefaultLogin.java    |  32 ++
 .../security/authenticator/LoginManager.java    | 112 ++++++
 .../authenticator/SaslClientAuthenticator.java  | 242 ++++++++----
 .../SaslClientCallbackHandler.java              |  94 +++++
 .../authenticator/SaslServerAuthenticator.java  | 195 +++++++--
 .../SaslServerCallbackHandler.java              |  22 +-
 .../common/security/kerberos/KerberosLogin.java | 392 +++++++++++++++++++
 .../kafka/common/security/kerberos/Login.java   | 379 ------------------
 .../common/security/kerberos/LoginManager.java  | 130 ------
 .../common/security/plain/PlainLoginModule.java |  66 ++++
 .../common/security/plain/PlainSaslServer.java  | 170 ++++++++
 .../security/plain/PlainSaslServerProvider.java |  38 ++
 .../common/requests/RequestResponseTest.java    |  11 +-
 .../org/apache/kafka/test/TestSslUtils.java     |   3 +
 .../main/scala/kafka/common/ErrorMapping.scala  |   2 +
 .../controller/ControllerChannelManager.scala   |  10 +-
 .../main/scala/kafka/network/SocketServer.scala |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   8 +
 .../main/scala/kafka/server/KafkaConfig.scala   |  18 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   9 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  10 +-
 .../kafka/api/BaseConsumerTest.scala            |   9 +-
 .../kafka/api/BaseProducerSendTest.scala        |   4 +-
 .../kafka/api/EndToEndAuthorizationTest.scala   |   4 +-
 .../kafka/api/IntegrationTestHarness.scala      |   8 +-
 .../kafka/api/PlaintextConsumerTest.scala       |   2 +-
 .../api/SaslMultiMechanismConsumerTest.scala    |  86 ++++
 .../api/SaslPlainPlaintextConsumerTest.scala    |  27 ++
 .../scala/integration/kafka/api/SaslSetup.scala |  36 +-
 .../integration/kafka/api/SaslTestHarness.scala |  22 +-
 .../integration/KafkaServerTestHarness.scala    |   4 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |   2 +
 .../scala/unit/kafka/utils/JaasTestUtils.scala  | 122 ++++--
 .../test/scala/unit/kafka/utils/TestUtils.scala |  40 +-
 53 files changed, 2171 insertions(+), 724 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 051c8d1..e94698c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -69,6 +69,13 @@
       <allow pkg="org.apache.kafka.common.annotation" />
       <allow pkg="org.apache.kafka.common.network" />
       <allow pkg="org.apache.kafka.common.config" />
+      <subpackage name="authenticator">
+        <allow pkg="org.apache.kafka.common.protocol" />
+        <allow pkg="org.apache.kafka.common.protocol.types" />
+        <allow pkg="org.apache.kafka.common.requests" />
+        <allow pkg="org.apache.kafka.common.errors" />
+        <allow pkg="org.apache.kafka.clients" />
+      </subpackage>
     </subpackage>
 
     <subpackage name="protocol">

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 0201257..ad9c5d0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +77,8 @@ public class ClientUtils {
         SecurityProtocol securityProtocol = SecurityProtocol.forName((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
         if (!SecurityProtocol.nonTestingValues().contains(securityProtocol))
             throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
-        return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs);
+        String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM);
+        return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs, clientSaslMechanism, true);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d2eaace..cc5dc6f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -373,6 +374,16 @@ public class NetworkClient implements KafkaClient {
         return found;
     }
 
+    public static Struct parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
+        ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
+        // Always expect the response version id to be the same as the request version id
+        short apiKey = requestHeader.apiKey();
+        short apiVer = requestHeader.apiVersion();
+        Struct responseBody = ProtoUtils.responseSchema(apiKey, apiVer).read(responseBuffer);
+        correlate(requestHeader, responseHeader);
+        return responseBody;
+    }
+
     /**
      * Post process disconnection of a node
      *
@@ -437,12 +448,7 @@ public class NetworkClient implements KafkaClient {
         for (NetworkReceive receive : this.selector.completedReceives()) {
             String source = receive.source();
             ClientRequest req = inFlightRequests.completeNext(source);
-            ResponseHeader header = ResponseHeader.parse(receive.payload());
-            // Always expect the response version id to be the same as the request version id
-            short apiKey = req.request().header().apiKey();
-            short apiVer = req.request().header().apiVersion();
-            Struct body = ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
-            correlate(req.request().header(), header);
+            Struct body = parseResponse(receive.payload(), req.request().header());
             if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                 responses.add(new ClientResponse(req, now, false, body));
         }
@@ -477,7 +483,7 @@ public class NetworkClient implements KafkaClient {
     /**
      * Validate that the response corresponds to the request we expect or else explode
      */
-    private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
+    private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
         if (requestHeader.correlationId() != responseHeader.correlationId())
             throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
                     + ") does not match request (" + requestHeader.correlationId() + ")");

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index d61838f..d3aa0d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -20,6 +20,17 @@ public class SaslConfigs {
     /*
      * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
      */
+    /** SASL mechanism configuration - standard mechanism names are listed <a href="http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml">here</a>. */
+    public static final String SASL_MECHANISM = "sasl.mechanism";
+    public static final String SASL_MECHANISM_DOC = "SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism.";
+    public static final String GSSAPI_MECHANISM = "GSSAPI";
+    public static final String DEFAULT_SASL_MECHANISM = GSSAPI_MECHANISM;
+
+    public static final String SASL_ENABLED_MECHANISMS = "sasl.enabled.mechanisms";
+    public static final String SASL_ENABLED_MECHANISMS_DOC = "The list of SASL mechanisms enabled in the Kafka server. "
+        + "The list may contain any mechanism for which a security provider is available. "
+        + "Only GSSAPI is enabled by default.";
+    public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = Collections.singletonList(GSSAPI_MECHANISM);
 
     public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
     public static final String SASL_KERBEROS_SERVICE_NAME_DOC = "The Kerberos principal name that Kafka runs as. "
@@ -54,7 +65,7 @@ public class SaslConfigs {
                 .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
                 .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
-                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC);
+                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+                .define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
new file mode 100644
index 0000000..7b60e11
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class AuthenticationException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public AuthenticationException(String message) {
+        super(message);
+    }
+
+    public AuthenticationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java
new file mode 100644
index 0000000..7fd008c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class IllegalSaslStateException extends AuthenticationException {
+
+    private static final long serialVersionUID = 1L;
+
+    public IllegalSaslStateException(String message) {
+        super(message);
+    }
+
+    public IllegalSaslStateException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java
new file mode 100644
index 0000000..289a09f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class UnsupportedSaslMechanismException extends AuthenticationException {
+
+    private static final long serialVersionUID = 1L;
+
+    public UnsupportedSaslMechanismException(String message) {
+        super(message);
+    }
+
+    public UnsupportedSaslMechanismException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 669f269..2d6ba8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -31,10 +31,18 @@ public class ChannelBuilders {
      *             it is ignored otherwise
      * @param loginType the loginType, it must be non-null if `securityProtocol` is SASL_*; it is ignored otherwise
      * @param configs client/server configs
+     * @param clientSaslMechanism SASL mechanism if mode is CLIENT, ignored otherwise
+     * @param saslHandshakeRequestEnable flag to enable Sasl handshake requests; disabled only for SASL
+     *             inter-broker connections with inter-broker protocol version < 0.10
      * @return the configured `ChannelBuilder`
      * @throws IllegalArgumentException if `mode` invariants described above is not maintained
      */
-    public static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, LoginType loginType, Map<String, ?> configs) {
+    public static ChannelBuilder create(SecurityProtocol securityProtocol,
+                                        Mode mode,
+                                        LoginType loginType,
+                                        Map<String, ?> configs,
+                                        String clientSaslMechanism,
+                                        boolean saslHandshakeRequestEnable) {
         ChannelBuilder channelBuilder;
 
         switch (securityProtocol) {
@@ -47,7 +55,9 @@ public class ChannelBuilders {
                 requireNonNullMode(mode, securityProtocol);
                 if (loginType == null)
                     throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
-                channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol);
+                if (mode == Mode.CLIENT && clientSaslMechanism == null)
+                    throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
+                channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable);
                 break;
             case PLAINTEXT:
             case TRACE:

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 0cd5bfe..a0464bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -21,13 +21,12 @@ import java.util.Map;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.apache.kafka.common.security.kerberos.LoginManager;
+import org.apache.kafka.common.security.authenticator.LoginManager;
 import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
 import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.KafkaException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,24 +34,34 @@ public class SaslChannelBuilder implements ChannelBuilder {
     private static final Logger log = LoggerFactory.getLogger(SaslChannelBuilder.class);
 
     private final SecurityProtocol securityProtocol;
+    private final String clientSaslMechanism;
     private final Mode mode;
     private final LoginType loginType;
+    private final boolean handshakeRequestEnable;
 
     private LoginManager loginManager;
     private SslFactory sslFactory;
     private Map<String, ?> configs;
     private KerberosShortNamer kerberosShortNamer;
 
-    public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol) {
+    public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol, String clientSaslMechanism, boolean handshakeRequestEnable) {
         this.mode = mode;
         this.loginType = loginType;
         this.securityProtocol = securityProtocol;
+        this.handshakeRequestEnable = handshakeRequestEnable;
+        this.clientSaslMechanism = clientSaslMechanism;
     }
 
     public void configure(Map<String, ?> configs) throws KafkaException {
         try {
             this.configs = configs;
-            this.loginManager = LoginManager.acquireLoginManager(loginType, configs);
+            boolean hasKerberos;
+            if (mode == Mode.SERVER) {
+                List<String> enabledMechanisms = (List<String>) this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS);
+                hasKerberos = enabledMechanisms == null || enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM);
+            } else {
+                hasKerberos = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM);
+            }
 
             String defaultRealm;
             try {
@@ -61,10 +70,13 @@ public class SaslChannelBuilder implements ChannelBuilder {
                 defaultRealm = "";
             }
 
-            @SuppressWarnings("unchecked")
-            List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES);
-            if (principalToLocalRules != null)
-                kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
+            if (hasKerberos) {
+                @SuppressWarnings("unchecked")
+                List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES);
+                if (principalToLocalRules != null)
+                    kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
+            }
+            this.loginManager = LoginManager.acquireLoginManager(loginType, hasKerberos, configs);
 
             if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
                 // Disable SSL client authentication as we are using SASL authentication
@@ -82,10 +94,11 @@ public class SaslChannelBuilder implements ChannelBuilder {
             TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
             Authenticator authenticator;
             if (mode == Mode.SERVER)
-                authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer, maxReceiveSize);
+                authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer,
+                        socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize);
             else
                 authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),
-                        socketChannel.socket().getInetAddress().getHostName());
+                        socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable);
             // Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes.
             authenticator.configure(transportLayer, null, this.configs);
             return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
@@ -96,7 +109,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
     }
 
     public void close()  {
-        this.loginManager.release();
+        if (this.loginManager != null)
+            this.loginManager.release();
     }
 
     protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index e8fd3d3..512a121 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -36,7 +36,8 @@ public enum ApiKeys {
     LEAVE_GROUP(13, "LeaveGroup"),
     SYNC_GROUP(14, "SyncGroup"),
     DESCRIBE_GROUPS(15, "DescribeGroups"),
-    LIST_GROUPS(16, "ListGroups");
+    LIST_GROUPS(16, "ListGroups"),
+    SASL_HANDSHAKE(17, "SaslHandshake");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 0f33516..9013399 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.GroupLoadInProgressException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
 import org.apache.kafka.common.errors.InvalidCommitOffsetSizeException;
 import org.apache.kafka.common.errors.InvalidFetchSizeException;
@@ -49,6 +50,7 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -130,7 +132,11 @@ public enum Errors {
     CLUSTER_AUTHORIZATION_FAILED(31,
             new ClusterAuthorizationException("Cluster authorization failed.")),
     INVALID_TIMESTAMP(32,
-            new InvalidTimestampException("The timestamp of the message is out of acceptable range."));
+            new InvalidTimestampException("The timestamp of the message is out of acceptable range.")),
+    UNSUPPORTED_SASL_MECHANISM(33,
+            new UnsupportedSaslMechanismException("The broker does not support the requested SASL mechanism.")),
+    ILLEGAL_SASL_STATE(34,
+            new IllegalSaslStateException("Request is not valid given the current SASL state."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 248b7ec..bf76557 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -718,6 +718,17 @@ public class Protocol {
     public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2};
     public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2};
 
+    /* SASL handshake api */
+    public static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema(
+            new Field("mechanism", STRING, "SASL Mechanism chosen by the client."));
+
+    public static final Schema SASL_HANDSHAKE_RESPONSE_V0 = new Schema(
+            new Field("error_code", INT16),
+            new Field("enabled_mechanisms", new ArrayOf(Type.STRING), "Array of mechanisms enabled in the server."));
+
+    public static final Schema[] SASL_HANDSHAKE_REQUEST = new Schema[] {SASL_HANDSHAKE_REQUEST_V0};
+    public static final Schema[] SASL_HANDSHAKE_RESPONSE = new Schema[] {SASL_HANDSHAKE_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -744,6 +755,7 @@ public class Protocol {
         REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST;
         REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST;
         REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST;
+        REQUESTS[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -762,6 +774,7 @@ public class Protocol {
         RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE;
         RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE;
         RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE;
+        RESPONSES[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_RESPONSE;
 
         /* set the maximum version of each api */
         for (ApiKeys api : ApiKeys.values())

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 5a40b7f..89c2ce1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -72,6 +72,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return DescribeGroupsRequest.parse(buffer, versionId);
             case LIST_GROUPS:
                 return ListGroupsRequest.parse(buffer, versionId);
+            case SASL_HANDSHAKE:
+                return SaslHandshakeRequest.parse(buffer, versionId);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
index 12b06d1..9494de7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
@@ -31,7 +31,7 @@ public class ResponseSend extends NetworkSend {
         this(destination, header, response.toStruct());
     }
 
-    private static ByteBuffer serialize(ResponseHeader header, Struct body) {
+    public static ByteBuffer serialize(ResponseHeader header, Struct body) {
         ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
         header.writeTo(buffer);
         body.writeTo(buffer);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
new file mode 100644
index 0000000..bddc9f0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+
+/**
+ * Request from SASL client containing client SASL mechanism.
+ * <p/>
+ * For interoperability with Kafka 0.9.0.x, the mechanism flow may be omitted when using GSSAPI. Hence
+ * this request should not conflict with the first GSSAPI client packet. For GSSAPI, the first context
+ * establishment packet starts with byte 0x60 (APPLICATION-0 tag) followed by a variable-length encoded size.
+ * This handshake request starts with a request header two-byte API key set to 17, followed by a mechanism name,
+ * making it easy to distinguish from a GSSAPI packet.
+ */
+public class SaslHandshakeRequest extends AbstractRequest {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.SASL_HANDSHAKE.id);
+    public static final String MECHANISM_KEY_NAME = "mechanism";
+
+    private final String mechanism;
+
+    public SaslHandshakeRequest(String mechanism) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(MECHANISM_KEY_NAME, mechanism);
+        this.mechanism = mechanism;
+    }
+
+    public SaslHandshakeRequest(Struct struct) {
+        super(struct);
+        mechanism = struct.getString(MECHANISM_KEY_NAME);
+    }
+
+    public String mechanism() {
+        return mechanism;
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                List<String> enabledMechanisms = Collections.emptyList();
+                return new SaslHandshakeResponse(Errors.forException(e).code(), enabledMechanisms);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id)));
+        }
+    }
+
+    public static SaslHandshakeRequest parse(ByteBuffer buffer, int versionId) {
+        return new SaslHandshakeRequest(ProtoUtils.parseRequest(ApiKeys.SASL_HANDSHAKE.id, versionId, buffer));
+    }
+
+    public static SaslHandshakeRequest parse(ByteBuffer buffer) {
+        return new SaslHandshakeRequest(CURRENT_SCHEMA.read(buffer));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
new file mode 100644
index 0000000..c0fc495
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+
+/**
+ * Response from SASL server which indicates if the client-chosen mechanism is enabled in the server.
+ * For error responses, the list of enabled mechanisms is included in the response.
+ */
+public class SaslHandshakeResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SASL_HANDSHAKE.id);
+
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String ENABLED_MECHANISMS_KEY_NAME = "enabled_mechanisms";
+
+    /**
+     * Possible error codes:
+     *   UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server
+     *   ILLEGAL_SASL_STATE(34) : Invalid request during SASL handshake
+     */
+    private final short errorCode;
+    private final List<String> enabledMechanisms;
+
+    public SaslHandshakeResponse(short errorCode, Collection<String> enabledMechanisms) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray());
+        this.errorCode = errorCode;
+        this.enabledMechanisms = new ArrayList<>(enabledMechanisms);
+    }
+
+    public SaslHandshakeResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        Object[] mechanisms = struct.getArray(ENABLED_MECHANISMS_KEY_NAME);
+        ArrayList<String> enabledMechanisms = new ArrayList<>();
+        for (Object mechanism : mechanisms)
+            enabledMechanisms.add((String) mechanism);
+        this.enabledMechanisms = enabledMechanisms;
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public List<String> enabledMechanisms() {
+        return enabledMechanisms;
+    }
+
+    public static SaslHandshakeResponse parse(ByteBuffer buffer) {
+        return new SaslHandshakeResponse(CURRENT_SCHEMA.read(buffer));
+    }
+
+    public static SaslHandshakeResponse parse(ByteBuffer buffer, int version) {
+        return new SaslHandshakeResponse(ProtoUtils.parseResponse(ApiKeys.SASL_HANDSHAKE.id, version, buffer));
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java
new file mode 100644
index 0000000..ed2c087
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+import java.util.Map;
+
+import org.apache.kafka.common.network.Mode;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+
+/*
+ * Callback handler for SASL-based authentication
+ */
+public interface AuthCallbackHandler extends CallbackHandler {
+
+    /**
+     * Configures this callback handler.
+     *
+     * @param configs Configuration
+     * @param mode The mode that indicates if this is a client or server connection
+     * @param subject Subject from login context
+     * @param saslMechanism Negotiated SASL mechanism
+     */
+    void configure(Map<String, ?> configs, Mode mode, Subject subject, String saslMechanism);
+
+    /**
+     * Closes this instance.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
new file mode 100644
index 0000000..1ac779d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+/**
+ * Login interface for authentication.
+ */
+public interface Login {
+
+    /**
+     * Configures this login instance.
+     */
+    void configure(Map<String, ?> configs, String loginContextName);
+
+    /**
+     * Performs login for each login module specified for the login context of this instance.
+     */
+    LoginContext login() throws LoginException;
+
+    /**
+     * Returns the authenticated subject of this login context.
+     */
+    Subject subject();
+
+    /**
+     * Returns the service name to be used for SASL.
+     */
+    String serviceName();
+
+    /**
+     * Closes this instance.
+     */
+    void close();
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
new file mode 100644
index 0000000..2fe43ab
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.authenticator;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.RealmCallback;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.Subject;
+
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.Login;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Base login class that implements methods common to typical SASL mechanisms.
+ */
+public abstract class AbstractLogin implements Login {
+    private static final Logger log = LoggerFactory.getLogger(AbstractLogin.class);
+
+    private String loginContextName;
+    private LoginContext loginContext;
+
+
+    @Override
+    public void configure(Map<String, ?> configs, String loginContextName) {
+        this.loginContextName = loginContextName;
+    }
+
+    @Override
+    public LoginContext login() throws LoginException {
+        String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
+        if (jaasConfigFile == null) {
+            log.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is not set, using default JAAS configuration.");
+        }
+        AppConfigurationEntry[] configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+        if (configEntries == null) {
+            String errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
+                JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
+            throw new IllegalArgumentException(errorMessage);
+        }
+
+        loginContext = new LoginContext(loginContextName, new LoginCallbackHandler());
+        loginContext.login();
+        log.info("Successfully logged in.");
+        return loginContext;
+    }
+
+    @Override
+    public Subject subject() {
+        return loginContext.getSubject();
+    }
+
+    /**
+     * Callback handler for creating login context. Login callback handlers
+     * should support the callbacks required for the login modules used by
+     * the KafkaServer and KafkaClient contexts. Kafka does not support
+     * callback handlers which require additional user input.
+     *
+     */
+    public static class LoginCallbackHandler implements CallbackHandler {
+
+        @Override
+        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof NameCallback) {
+                    NameCallback nc = (NameCallback) callback;
+                    nc.setName(nc.getDefaultName());
+                } else if (callback instanceof PasswordCallback) {
+                    String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" +
+                                 " client code does not currently support obtaining a password from the user.";
+                    throw new UnsupportedCallbackException(callback, errorMessage);
+                } else if (callback instanceof RealmCallback) {
+                    RealmCallback rc = (RealmCallback) callback;
+                    rc.setText(rc.getDefaultText());
+                } else {
+                    throw new UnsupportedCallbackException(callback, "Unrecognized SASL Login callback");
+                }
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java
new file mode 100644
index 0000000..0a405bc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.authenticator;
+
+public class DefaultLogin extends AbstractLogin {
+
+    @Override
+    public String serviceName() {
+        return "kafka";
+    }
+
+    @Override
+    public void close() {
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
new file mode 100644
index 0000000..9aec9a7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.authenticator;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.apache.kafka.common.network.LoginType;
+import org.apache.kafka.common.security.auth.Login;
+import org.apache.kafka.common.security.kerberos.KerberosLogin;
+
+public class LoginManager {
+
+    private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap<>(LoginType.class);
+
+    private final Login login;
+    private final LoginType loginType;
+    private int refCount;
+
+    private LoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs) throws IOException, LoginException {
+        this.loginType = loginType;
+        String loginContext = loginType.contextName();
+        login = hasKerberos ? new KerberosLogin() : new DefaultLogin();
+        login.configure(configs, loginContext);
+        login.login();
+    }
+
+    /**
+     * Returns an instance of `LoginManager` and increases its reference count.
+     *
+     * `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an
+     * existing `LoginManager` for the provided `mode` if available. However, it expects `configs` to be the same for
+     * every invocation and it will ignore them in the case where it's returning a cached instance of `LoginManager`.
+     *
+     * This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and
+     * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more
+     * complicated to do the latter without making the consumer API more complex.
+     *
+     * @param loginType the type of the login context, it should be SERVER for the broker and CLIENT for the clients
+     *                  (i.e. consumer and producer)
+     * @param configs configuration as key/value pairs
+     */
+    public static final LoginManager acquireLoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs) throws IOException, LoginException {
+        synchronized (LoginManager.class) {
+            LoginManager loginManager = CACHED_INSTANCES.get(loginType);
+            if (loginManager == null) {
+                loginManager = new LoginManager(loginType, hasKerberos, configs);
+                CACHED_INSTANCES.put(loginType, loginManager);
+            }
+            return loginManager.acquire();
+        }
+    }
+
+    public Subject subject() {
+        return login.subject();
+    }
+
+    public String serviceName() {
+        return login.serviceName();
+    }
+
+    private LoginManager acquire() {
+        ++refCount;
+        return this;
+    }
+
+    /**
+     * Decrease the reference count for this instance and release resources if it reaches 0.
+     */
+    public void release() {
+        synchronized (LoginManager.class) {
+            if (refCount == 0)
+                throw new IllegalStateException("release called on LoginManager with refCount == 0");
+            else if (refCount == 1) {
+                CACHED_INSTANCES.remove(loginType);
+                login.close();
+            }
+            --refCount;
+        }
+    }
+
+    /* Should only be used in tests. */
+    public static void closeAll() {
+        synchronized (LoginManager.class) {
+            for (LoginType loginType : new ArrayList<>(CACHED_INSTANCES.keySet())) {
+                LoginManager loginManager = CACHED_INSTANCES.remove(loginType);
+                loginManager.login.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 370e729..ba201dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -23,38 +23,44 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.util.Arrays;
 import java.util.Map;
-
 import java.security.Principal;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 
 import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.SaslHandshakeRequest;
+import org.apache.kafka.common.requests.SaslHandshakeResponse;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.KafkaException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SaslClientAuthenticator implements Authenticator {
 
     public enum SaslState {
-        INITIAL, INTERMEDIATE, COMPLETE, FAILED
+        SEND_HANDSHAKE_REQUEST, RECEIVE_HANDSHAKE_RESPONSE, INITIAL, INTERMEDIATE, COMPLETE, FAILED
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(SaslClientAuthenticator.class);
@@ -63,33 +69,57 @@ public class SaslClientAuthenticator implements Authenticator {
     private final String servicePrincipal;
     private final String host;
     private final String node;
+    private final String mechanism;
+    private final boolean handshakeRequestEnable;
 
     // assigned in `configure`
     private SaslClient saslClient;
+    private Map<String, ?> configs;
     private String clientPrincipalName;
+    private AuthCallbackHandler callbackHandler;
     private TransportLayer transportLayer;
 
     // buffers used in `authenticate`
     private NetworkReceive netInBuffer;
     private NetworkSend netOutBuffer;
 
-    private SaslState saslState = SaslState.INITIAL;
+    // Current SASL state
+    private SaslState saslState;
+    // Next SASL state to be set when outgoing writes associated with the current SASL state complete
+    private SaslState pendingSaslState;
+    // Correlation ID for the next request
+    private int correlationId;
+    // Request header for which response from the server is pending
+    private RequestHeader currentRequestHeader;
 
-    public SaslClientAuthenticator(String node, Subject subject, String servicePrincipal, String host) throws IOException {
+    public SaslClientAuthenticator(String node, Subject subject, String servicePrincipal, String host, String mechanism, boolean handshakeRequestEnable) throws IOException {
         this.node = node;
         this.subject = subject;
         this.host = host;
         this.servicePrincipal = servicePrincipal;
+        this.mechanism = mechanism;
+        this.handshakeRequestEnable = handshakeRequestEnable;
+        this.correlationId = -1;
     }
 
     public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) throws KafkaException {
         try {
             this.transportLayer = transportLayer;
+            this.configs = configs;
+
+            setSaslState(handshakeRequestEnable ? SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL);
 
             // determine client principal from subject.
-            Principal clientPrincipal = subject.getPrincipals().iterator().next();
-            this.clientPrincipalName = clientPrincipal.getName();
-            this.saslClient = createSaslClient();
+            if (!subject.getPrincipals().isEmpty()) {
+                Principal clientPrincipal = subject.getPrincipals().iterator().next();
+                this.clientPrincipalName = clientPrincipal.getName();
+            } else {
+                clientPrincipalName = null;
+            }
+            callbackHandler = new SaslClientCallbackHandler();
+            callbackHandler.configure(configs, Mode.CLIENT, subject, mechanism);
+
+            saslClient = createSaslClient();
         } catch (Exception e) {
             throw new KafkaException("Failed to configure SaslClientAuthenticator", e);
         }
@@ -99,15 +129,14 @@ public class SaslClientAuthenticator implements Authenticator {
         try {
             return Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
                 public SaslClient run() throws SaslException {
-                    String[] mechs = {"GSSAPI"};
+                    String[] mechs = {mechanism};
                     LOG.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}",
                         clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs));
-                    return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, null,
-                            new ClientCallbackHandler());
+                    return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, configs, callbackHandler);
                 }
             });
         } catch (PrivilegedActionException e) {
-            throw new KafkaException("Failed to create SaslClient", e.getCause());
+            throw new KafkaException("Failed to create SaslClient with mechanism " + mechanism, e.getCause());
         }
     }
 
@@ -123,22 +152,39 @@ public class SaslClientAuthenticator implements Authenticator {
             return;
 
         switch (saslState) {
+            case SEND_HANDSHAKE_REQUEST:
+                String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
+                currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, clientId, correlationId++);
+                SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest(mechanism);
+                send(RequestSend.serialize(currentRequestHeader, handshakeRequest.toStruct()));
+                setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
+                break;
+            case RECEIVE_HANDSHAKE_RESPONSE:
+                byte[] responseBytes = receiveResponseOrToken();
+                if (responseBytes == null)
+                    break;
+                else {
+                    try {
+                        handleKafkaResponse(currentRequestHeader, responseBytes);
+                        currentRequestHeader = null;
+                    } catch (Exception e) {
+                        setSaslState(SaslState.FAILED);
+                        throw e;
+                    }
+                    setSaslState(SaslState.INITIAL);
+                    // Fall through and start SASL authentication using the configured client mechanism
+                }
             case INITIAL:
-                sendSaslToken(new byte[0]);
-                saslState = SaslState.INTERMEDIATE;
+                sendSaslToken(new byte[0], true);
+                setSaslState(SaslState.INTERMEDIATE);
                 break;
             case INTERMEDIATE:
-                if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
-                netInBuffer.readFrom(transportLayer);
-                if (netInBuffer.complete()) {
-                    netInBuffer.payload().rewind();
-                    byte[] serverToken = new byte[netInBuffer.payload().remaining()];
-                    netInBuffer.payload().get(serverToken, 0, serverToken.length);
-                    netInBuffer = null; // reset the networkReceive as we read all the data.
-                    sendSaslToken(serverToken);
+                byte[] serverToken = receiveResponseOrToken();
+                if (serverToken != null) {
+                    sendSaslToken(serverToken, false);
                 }
                 if (saslClient.isComplete()) {
-                    saslState = SaslState.COMPLETE;
+                    setSaslState(SaslState.COMPLETE);
                     transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
                 }
                 break;
@@ -149,30 +195,58 @@ public class SaslClientAuthenticator implements Authenticator {
         }
     }
 
-    private void sendSaslToken(byte[] serverToken) throws IOException {
+    private void setSaslState(SaslState saslState) {
+        if (netOutBuffer != null && !netOutBuffer.completed())
+            pendingSaslState = saslState;
+        else {
+            this.pendingSaslState = null;
+            this.saslState = saslState;
+            LOG.debug("Set SASL client state to {}", saslState);
+        }
+    }
+
+    private void sendSaslToken(byte[] serverToken, boolean isInitial) throws IOException {
         if (!saslClient.isComplete()) {
-            try {
-                byte[] saslToken = createSaslToken(serverToken);
-                if (saslToken != null) {
-                    netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(saslToken));
-                    flushNetOutBufferAndUpdateInterestOps();
-                }
-            } catch (IOException e) {
-                saslState = SaslState.FAILED;
-                throw e;
-            }
+            byte[] saslToken = createSaslToken(serverToken, isInitial);
+            if (saslToken != null)
+                send(ByteBuffer.wrap(saslToken));
+        }
+    }
+
+    private void send(ByteBuffer buffer) throws IOException {
+        try {
+            netOutBuffer = new NetworkSend(node, buffer);
+            flushNetOutBufferAndUpdateInterestOps();
+        } catch (IOException e) {
+            setSaslState(SaslState.FAILED);
+            throw e;
         }
     }
 
     private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
         boolean flushedCompletely = flushNetOutBuffer();
-        if (flushedCompletely)
+        if (flushedCompletely) {
             transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
-        else
+            if (pendingSaslState != null)
+                setSaslState(pendingSaslState);
+        } else
             transportLayer.addInterestOps(SelectionKey.OP_WRITE);
         return flushedCompletely;
     }
 
+    private byte[] receiveResponseOrToken() throws IOException {
+        if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
+        netInBuffer.readFrom(transportLayer);
+        byte[] serverPacket = null;
+        if (netInBuffer.complete()) {
+            netInBuffer.payload().rewind();
+            serverPacket = new byte[netInBuffer.payload().remaining()];
+            netInBuffer.payload().get(serverPacket, 0, serverPacket.length);
+            netInBuffer = null; // reset the networkReceive as we read all the data.
+        }
+        return serverPacket;
+    }
+
     public Principal principal() {
         return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, clientPrincipalName);
     }
@@ -182,19 +256,25 @@ public class SaslClientAuthenticator implements Authenticator {
     }
 
     public void close() throws IOException {
-        saslClient.dispose();
+        if (saslClient != null)
+            saslClient.dispose();
+        if (callbackHandler != null)
+            callbackHandler.close();
     }
 
-    private byte[] createSaslToken(final byte[] saslToken) throws SaslException {
+    private byte[] createSaslToken(final byte[] saslToken, boolean isInitial) throws SaslException {
         if (saslToken == null)
             throw new SaslException("Error authenticating with the Kafka Broker: received a `null` saslToken.");
 
         try {
-            return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
-                public byte[] run() throws SaslException {
-                    return saslClient.evaluateChallenge(saslToken);
-                }
-            });
+            if (isInitial && !saslClient.hasInitialResponse())
+                return saslToken;
+            else
+                return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+                    public byte[] run() throws SaslException {
+                        return saslClient.evaluateChallenge(saslToken);
+                    }
+                });
         } catch (PrivilegedActionException e) {
             String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
             // Try to provide hints to use about what went wrong so they can fix their configuration.
@@ -221,35 +301,39 @@ public class SaslClientAuthenticator implements Authenticator {
         return netOutBuffer.completed();
     }
 
-    public static class ClientCallbackHandler implements CallbackHandler {
-
-        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
-            for (Callback callback : callbacks) {
-                if (callback instanceof NameCallback) {
-                    NameCallback nc = (NameCallback) callback;
-                    nc.setName(nc.getDefaultName());
-                } else if (callback instanceof PasswordCallback) {
-                    // Call `setPassword` once we support obtaining a password from the user and update message below
-                    throw new UnsupportedCallbackException(callback, "Could not login: the client is being asked for a password, but the Kafka" +
-                             " client code does not currently support obtaining a password from the user." +
-                             " Make sure -Djava.security.auth.login.config property passed to JVM and" +
-                             " the client is configured to use a ticket cache (using" +
-                             " the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using" +
-                             " FQDN of the Kafka broker you are trying to connect to.");
-                } else if (callback instanceof RealmCallback) {
-                    RealmCallback rc = (RealmCallback) callback;
-                    rc.setText(rc.getDefaultText());
-                } else if (callback instanceof AuthorizeCallback) {
-                    AuthorizeCallback ac = (AuthorizeCallback) callback;
-                    String authId = ac.getAuthenticationID();
-                    String authzId = ac.getAuthorizationID();
-                    ac.setAuthorized(authId.equals(authzId));
-                    if (ac.isAuthorized())
-                        ac.setAuthorizedID(authzId);
-                } else {
-                    throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
-                }
-            }
+    private void handleKafkaResponse(RequestHeader requestHeader, byte[] responseBytes) {
+        Struct struct;
+        ApiKeys apiKey;
+        try {
+            struct = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader);
+            apiKey = ApiKeys.forId(requestHeader.apiKey());
+        } catch (SchemaException | IllegalArgumentException e) {
+            LOG.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens");
+            throw new AuthenticationException("Invalid SASL mechanism response", e);
+        }
+        switch (apiKey) {
+            case SASL_HANDSHAKE:
+                handleSaslHandshakeResponse(new SaslHandshakeResponse(struct));
+                break;
+            default:
+                throw new IllegalStateException("Unexpected API key during handshake: " + apiKey);
+        }
+    }
+
+    private void handleSaslHandshakeResponse(SaslHandshakeResponse response) {
+        Errors error = Errors.forCode(response.errorCode());
+        switch (error) {
+            case NONE:
+                break;
+            case UNSUPPORTED_SASL_MECHANISM:
+                throw new UnsupportedSaslMechanismException(String.format("Client SASL mechanism '%s' not enabled in the server, enabled mechanisms are %s",
+                    mechanism, response.enabledMechanisms()));
+            case ILLEGAL_SASL_STATE:
+                throw new IllegalSaslStateException(String.format("Unexpected handshake request with client mechanism %s, enabled mechanisms are %s",
+                    mechanism, response.enabledMechanisms()));
+            default:
+                throw new AuthenticationException(String.format("Unknown error code %d, client mechanism is %s, enabled mechanisms are %s",
+                    response.errorCode(), mechanism, response.enabledMechanisms()));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
new file mode 100644
index 0000000..8e0b8db
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+
+/**
+ * Callback handler for Sasl clients. The callbacks required for the SASL mechanism
+ * configured for the client should be supported by this callback handler. See
+ * <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/sasl/sasl-refguide.html">Java SASL API</a>
+ * for the list of SASL callback handlers required for each SASL mechanism.
+ */
+public class SaslClientCallbackHandler implements AuthCallbackHandler {
+
+    private boolean isKerberos;
+    private Subject subject;
+
+    @Override
+    public void configure(Map<String, ?> configs, Mode mode, Subject subject, String mechanism) {
+        this.isKerberos = mechanism.equals(SaslConfigs.GSSAPI_MECHANISM);
+        this.subject = subject;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+        for (Callback callback : callbacks) {
+            if (callback instanceof NameCallback) {
+                NameCallback nc = (NameCallback) callback;
+                if (!isKerberos && subject != null && !subject.getPublicCredentials(String.class).isEmpty()) {
+                    nc.setName(subject.getPublicCredentials(String.class).iterator().next());
+                } else
+                    nc.setName(nc.getDefaultName());
+            } else if (callback instanceof PasswordCallback) {
+                if (!isKerberos && subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) {
+                    char [] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
+                    ((PasswordCallback) callback).setPassword(password);
+                } else {
+                    String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" +
+                             " client code does not currently support obtaining a password from the user.";
+                    if (isKerberos) {
+                        errorMessage += " Make sure -Djava.security.auth.login.config property passed to JVM and" +
+                             " the client is configured to use a ticket cache (using" +
+                             " the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using" +
+                             " FQDN of the Kafka broker you are trying to connect to.";
+                    }
+                    throw new UnsupportedCallbackException(callback, errorMessage);
+                }
+            } else if (callback instanceof RealmCallback) {
+                RealmCallback rc = (RealmCallback) callback;
+                rc.setText(rc.getDefaultText());
+            } else if (callback instanceof AuthorizeCallback) {
+                AuthorizeCallback ac = (AuthorizeCallback) callback;
+                String authId = ac.getAuthenticationID();
+                String authzId = ac.getAuthorizationID();
+                ac.setAuthorized(authId.equals(authzId));
+                if (ac.isAuthorized())
+                    ac.setAuthorizedID(authzId);
+            } else {
+                throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+}
\ No newline at end of file