You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:38 UTC
[24/50] [abbrv] kafka git commit: KAFKA-3149;
Extend SASL implementation to support more mechanisms
KAFKA-3149; Extend SASL implementation to support more mechanisms
Code changes corresponding to KIP-43 to enable review of the KIP.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Jun Rao <ju...@apache.org>, Ismael Juma <is...@juma.me.uk>
Closes #812 from rajinisivaram/KAFKA-3149
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b375d7b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b375d7b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b375d7b
Branch: refs/heads/0.10.0
Commit: 5b375d7bf9b26aaeed06bac2dc5de3f8214cbad4
Parents: cd427c9
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Tue Apr 26 16:56:42 2016 -0700
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Apr 26 16:56:42 2016 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 7 +
.../org/apache/kafka/clients/ClientUtils.java | 4 +-
.../org/apache/kafka/clients/NetworkClient.java | 20 +-
.../apache/kafka/common/config/SaslConfigs.java | 15 +-
.../common/errors/AuthenticationException.java | 27 ++
.../errors/IllegalSaslStateException.java | 27 ++
.../UnsupportedSaslMechanismException.java | 27 ++
.../kafka/common/network/ChannelBuilders.java | 14 +-
.../common/network/SaslChannelBuilder.java | 36 +-
.../apache/kafka/common/protocol/ApiKeys.java | 3 +-
.../apache/kafka/common/protocol/Errors.java | 8 +-
.../apache/kafka/common/protocol/Protocol.java | 13 +
.../kafka/common/requests/AbstractRequest.java | 2 +
.../kafka/common/requests/ResponseSend.java | 2 +-
.../common/requests/SaslHandshakeRequest.java | 83 ++++
.../common/requests/SaslHandshakeResponse.java | 85 ++++
.../security/auth/AuthCallbackHandler.java | 46 +++
.../kafka/common/security/auth/Login.java | 57 +++
.../security/authenticator/AbstractLogin.java | 108 +++++
.../security/authenticator/DefaultLogin.java | 32 ++
.../security/authenticator/LoginManager.java | 112 ++++++
.../authenticator/SaslClientAuthenticator.java | 242 ++++++++----
.../SaslClientCallbackHandler.java | 94 +++++
.../authenticator/SaslServerAuthenticator.java | 195 +++++++--
.../SaslServerCallbackHandler.java | 22 +-
.../common/security/kerberos/KerberosLogin.java | 392 +++++++++++++++++++
.../kafka/common/security/kerberos/Login.java | 379 ------------------
.../common/security/kerberos/LoginManager.java | 130 ------
.../common/security/plain/PlainLoginModule.java | 66 ++++
.../common/security/plain/PlainSaslServer.java | 170 ++++++++
.../security/plain/PlainSaslServerProvider.java | 38 ++
.../common/requests/RequestResponseTest.java | 11 +-
.../org/apache/kafka/test/TestSslUtils.java | 3 +
.../main/scala/kafka/common/ErrorMapping.scala | 2 +
.../controller/ControllerChannelManager.scala | 10 +-
.../main/scala/kafka/network/SocketServer.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 8 +
.../main/scala/kafka/server/KafkaConfig.scala | 18 +-
.../main/scala/kafka/server/KafkaServer.scala | 9 +-
.../kafka/server/ReplicaFetcherThread.scala | 10 +-
.../kafka/api/BaseConsumerTest.scala | 9 +-
.../kafka/api/BaseProducerSendTest.scala | 4 +-
.../kafka/api/EndToEndAuthorizationTest.scala | 4 +-
.../kafka/api/IntegrationTestHarness.scala | 8 +-
.../kafka/api/PlaintextConsumerTest.scala | 2 +-
.../api/SaslMultiMechanismConsumerTest.scala | 86 ++++
.../api/SaslPlainPlaintextConsumerTest.scala | 27 ++
.../scala/integration/kafka/api/SaslSetup.scala | 36 +-
.../integration/kafka/api/SaslTestHarness.scala | 22 +-
.../integration/KafkaServerTestHarness.scala | 4 +-
.../unit/kafka/server/KafkaConfigTest.scala | 2 +
.../scala/unit/kafka/utils/JaasTestUtils.scala | 122 ++++--
.../test/scala/unit/kafka/utils/TestUtils.scala | 40 +-
53 files changed, 2171 insertions(+), 724 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 051c8d1..e94698c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -69,6 +69,13 @@
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.config" />
+ <subpackage name="authenticator">
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.protocol.types" />
+ <allow pkg="org.apache.kafka.common.requests" />
+ <allow pkg="org.apache.kafka.common.errors" />
+ <allow pkg="org.apache.kafka.clients" />
+ </subpackage>
</subpackage>
<subpackage name="protocol">
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 0201257..ad9c5d0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +77,8 @@ public class ClientUtils {
SecurityProtocol securityProtocol = SecurityProtocol.forName((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
if (!SecurityProtocol.nonTestingValues().contains(securityProtocol))
throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
- return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs);
+ String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM);
+ return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs, clientSaslMechanism, true);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d2eaace..cc5dc6f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -373,6 +374,16 @@ public class NetworkClient implements KafkaClient {
return found;
}
+ public static Struct parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
+ ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
+ // Always expect the response version id to be the same as the request version id
+ short apiKey = requestHeader.apiKey();
+ short apiVer = requestHeader.apiVersion();
+ Struct responseBody = ProtoUtils.responseSchema(apiKey, apiVer).read(responseBuffer);
+ correlate(requestHeader, responseHeader);
+ return responseBody;
+ }
+
/**
* Post process disconnection of a node
*
@@ -437,12 +448,7 @@ public class NetworkClient implements KafkaClient {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
- ResponseHeader header = ResponseHeader.parse(receive.payload());
- // Always expect the response version id to be the same as the request version id
- short apiKey = req.request().header().apiKey();
- short apiVer = req.request().header().apiVersion();
- Struct body = ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
- correlate(req.request().header(), header);
+ Struct body = parseResponse(receive.payload(), req.request().header());
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
@@ -477,7 +483,7 @@ public class NetworkClient implements KafkaClient {
/**
* Validate that the response corresponds to the request we expect or else explode
*/
- private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
+ private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
if (requestHeader.correlationId() != responseHeader.correlationId())
throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
+ ") does not match request (" + requestHeader.correlationId() + ")");
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index d61838f..d3aa0d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -20,6 +20,17 @@ public class SaslConfigs {
/*
* NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
*/
+ /** SASL mechanism configuration - standard mechanism names are listed <a href="http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml">here</a>. */
+ public static final String SASL_MECHANISM = "sasl.mechanism";
+ public static final String SASL_MECHANISM_DOC = "SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism.";
+ public static final String GSSAPI_MECHANISM = "GSSAPI";
+ public static final String DEFAULT_SASL_MECHANISM = GSSAPI_MECHANISM;
+
+ public static final String SASL_ENABLED_MECHANISMS = "sasl.enabled.mechanisms";
+ public static final String SASL_ENABLED_MECHANISMS_DOC = "The list of SASL mechanisms enabled in the Kafka server. "
+ + "The list may contain any mechanism for which a security provider is available. "
+ + "Only GSSAPI is enabled by default.";
+ public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = Collections.singletonList(GSSAPI_MECHANISM);
public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
public static final String SASL_KERBEROS_SERVICE_NAME_DOC = "The Kerberos principal name that Kafka runs as. "
@@ -54,7 +65,7 @@ public class SaslConfigs {
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
- .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC);
+ .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+ .define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC);
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
new file mode 100644
index 0000000..7b60e11
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class AuthenticationException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public AuthenticationException(String message) {
+ super(message);
+ }
+
+ public AuthenticationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java
new file mode 100644
index 0000000..7fd008c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class IllegalSaslStateException extends AuthenticationException {
+
+ private static final long serialVersionUID = 1L;
+
+ public IllegalSaslStateException(String message) {
+ super(message);
+ }
+
+ public IllegalSaslStateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java
new file mode 100644
index 0000000..289a09f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class UnsupportedSaslMechanismException extends AuthenticationException {
+
+ private static final long serialVersionUID = 1L;
+
+ public UnsupportedSaslMechanismException(String message) {
+ super(message);
+ }
+
+ public UnsupportedSaslMechanismException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 669f269..2d6ba8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -31,10 +31,18 @@ public class ChannelBuilders {
* it is ignored otherwise
* @param loginType the loginType, it must be non-null if `securityProtocol` is SASL_*; it is ignored otherwise
* @param configs client/server configs
+ * @param clientSaslMechanism SASL mechanism if mode is CLIENT, ignored otherwise
+ * @param saslHandshakeRequestEnable flag to enable Sasl handshake requests; disabled only for SASL
+ * inter-broker connections with inter-broker protocol version < 0.10
* @return the configured `ChannelBuilder`
* @throws IllegalArgumentException if `mode` invariants described above is not maintained
*/
- public static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, LoginType loginType, Map<String, ?> configs) {
+ public static ChannelBuilder create(SecurityProtocol securityProtocol,
+ Mode mode,
+ LoginType loginType,
+ Map<String, ?> configs,
+ String clientSaslMechanism,
+ boolean saslHandshakeRequestEnable) {
ChannelBuilder channelBuilder;
switch (securityProtocol) {
@@ -47,7 +55,9 @@ public class ChannelBuilders {
requireNonNullMode(mode, securityProtocol);
if (loginType == null)
throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
- channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol);
+ if (mode == Mode.CLIENT && clientSaslMechanism == null)
+ throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
+ channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable);
break;
case PLAINTEXT:
case TRACE:
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 0cd5bfe..a0464bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -21,13 +21,12 @@ import java.util.Map;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.apache.kafka.common.security.kerberos.LoginManager;
+import org.apache.kafka.common.security.authenticator.LoginManager;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.KafkaException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,24 +34,34 @@ public class SaslChannelBuilder implements ChannelBuilder {
private static final Logger log = LoggerFactory.getLogger(SaslChannelBuilder.class);
private final SecurityProtocol securityProtocol;
+ private final String clientSaslMechanism;
private final Mode mode;
private final LoginType loginType;
+ private final boolean handshakeRequestEnable;
private LoginManager loginManager;
private SslFactory sslFactory;
private Map<String, ?> configs;
private KerberosShortNamer kerberosShortNamer;
- public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol) {
+ public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol, String clientSaslMechanism, boolean handshakeRequestEnable) {
this.mode = mode;
this.loginType = loginType;
this.securityProtocol = securityProtocol;
+ this.handshakeRequestEnable = handshakeRequestEnable;
+ this.clientSaslMechanism = clientSaslMechanism;
}
public void configure(Map<String, ?> configs) throws KafkaException {
try {
this.configs = configs;
- this.loginManager = LoginManager.acquireLoginManager(loginType, configs);
+ boolean hasKerberos;
+ if (mode == Mode.SERVER) {
+ List<String> enabledMechanisms = (List<String>) this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS);
+ hasKerberos = enabledMechanisms == null || enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM);
+ } else {
+ hasKerberos = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM);
+ }
String defaultRealm;
try {
@@ -61,10 +70,13 @@ public class SaslChannelBuilder implements ChannelBuilder {
defaultRealm = "";
}
- @SuppressWarnings("unchecked")
- List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES);
- if (principalToLocalRules != null)
- kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
+ if (hasKerberos) {
+ @SuppressWarnings("unchecked")
+ List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES);
+ if (principalToLocalRules != null)
+ kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
+ }
+ this.loginManager = LoginManager.acquireLoginManager(loginType, hasKerberos, configs);
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
// Disable SSL client authentication as we are using SASL authentication
@@ -82,10 +94,11 @@ public class SaslChannelBuilder implements ChannelBuilder {
TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
Authenticator authenticator;
if (mode == Mode.SERVER)
- authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer, maxReceiveSize);
+ authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer,
+ socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize);
else
authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),
- socketChannel.socket().getInetAddress().getHostName());
+ socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable);
// Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes.
authenticator.configure(transportLayer, null, this.configs);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
@@ -96,7 +109,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
}
public void close() {
- this.loginManager.release();
+ if (this.loginManager != null)
+ this.loginManager.release();
}
protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index e8fd3d3..512a121 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -36,7 +36,8 @@ public enum ApiKeys {
LEAVE_GROUP(13, "LeaveGroup"),
SYNC_GROUP(14, "SyncGroup"),
DESCRIBE_GROUPS(15, "DescribeGroups"),
- LIST_GROUPS(16, "ListGroups");
+ LIST_GROUPS(16, "ListGroups"),
+ SASL_HANDSHAKE(17, "SaslHandshake");
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 0f33516..9013399 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
import org.apache.kafka.common.errors.GroupLoadInProgressException;
import org.apache.kafka.common.errors.IllegalGenerationException;
+import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
import org.apache.kafka.common.errors.InvalidCommitOffsetSizeException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
@@ -49,6 +50,7 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -130,7 +132,11 @@ public enum Errors {
CLUSTER_AUTHORIZATION_FAILED(31,
new ClusterAuthorizationException("Cluster authorization failed.")),
INVALID_TIMESTAMP(32,
- new InvalidTimestampException("The timestamp of the message is out of acceptable range."));
+ new InvalidTimestampException("The timestamp of the message is out of acceptable range.")),
+ UNSUPPORTED_SASL_MECHANISM(33,
+ new UnsupportedSaslMechanismException("The broker does not support the requested SASL mechanism.")),
+ ILLEGAL_SASL_STATE(34,
+ new IllegalSaslStateException("Request is not valid given the current SASL state."));
private static final Logger log = LoggerFactory.getLogger(Errors.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 248b7ec..bf76557 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -718,6 +718,17 @@ public class Protocol {
public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2};
public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2};
+ /* SASL handshake api */
+ public static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema(
+ new Field("mechanism", STRING, "SASL Mechanism chosen by the client."));
+
+ public static final Schema SASL_HANDSHAKE_RESPONSE_V0 = new Schema(
+ new Field("error_code", INT16),
+ new Field("enabled_mechanisms", new ArrayOf(Type.STRING), "Array of mechanisms enabled in the server."));
+
+ public static final Schema[] SASL_HANDSHAKE_REQUEST = new Schema[] {SASL_HANDSHAKE_REQUEST_V0};
+ public static final Schema[] SASL_HANDSHAKE_RESPONSE = new Schema[] {SASL_HANDSHAKE_RESPONSE_V0};
+
/* an array of all requests and responses with all schema versions; a null value in the inner array means that the
* particular version is not supported */
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -744,6 +755,7 @@ public class Protocol {
REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST;
REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST;
REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST;
+ REQUESTS[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -762,6 +774,7 @@ public class Protocol {
RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE;
RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE;
RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE;
+ RESPONSES[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_RESPONSE;
/* set the maximum version of each api */
for (ApiKeys api : ApiKeys.values())
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 5a40b7f..89c2ce1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -72,6 +72,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return DescribeGroupsRequest.parse(buffer, versionId);
case LIST_GROUPS:
return ListGroupsRequest.parse(buffer, versionId);
+ case SASL_HANDSHAKE:
+ return SaslHandshakeRequest.parse(buffer, versionId);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
"code should be updated to do so.", apiKey));
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
index 12b06d1..9494de7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
@@ -31,7 +31,7 @@ public class ResponseSend extends NetworkSend {
this(destination, header, response.toStruct());
}
- private static ByteBuffer serialize(ResponseHeader header, Struct body) {
+ public static ByteBuffer serialize(ResponseHeader header, Struct body) {
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
header.writeTo(buffer);
body.writeTo(buffer);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
new file mode 100644
index 0000000..bddc9f0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+
+/**
+ * Request from SASL client containing client SASL mechanism.
+ * <p/>
+ * For interoperability with Kafka 0.9.0.x, the mechanism flow may be omitted when using GSSAPI. Hence
+ * this request should not conflict with the first GSSAPI client packet. For GSSAPI, the first context
+ * establishment packet starts with byte 0x60 (APPLICATION-0 tag) followed by a variable-length encoded size.
+ * This handshake request starts with a request header two-byte API key set to 17, followed by a mechanism name,
+ * making it easy to distinguish from a GSSAPI packet.
+ */
+public class SaslHandshakeRequest extends AbstractRequest {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.SASL_HANDSHAKE.id);
+ public static final String MECHANISM_KEY_NAME = "mechanism";
+
+ private final String mechanism;
+
+ public SaslHandshakeRequest(String mechanism) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(MECHANISM_KEY_NAME, mechanism);
+ this.mechanism = mechanism;
+ }
+
+ public SaslHandshakeRequest(Struct struct) {
+ super(struct);
+ mechanism = struct.getString(MECHANISM_KEY_NAME);
+ }
+
+ public String mechanism() {
+ return mechanism;
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ switch (versionId) {
+ case 0:
+ List<String> enabledMechanisms = Collections.emptyList();
+ return new SaslHandshakeResponse(Errors.forException(e).code(), enabledMechanisms);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id)));
+ }
+ }
+
+ public static SaslHandshakeRequest parse(ByteBuffer buffer, int versionId) {
+ return new SaslHandshakeRequest(ProtoUtils.parseRequest(ApiKeys.SASL_HANDSHAKE.id, versionId, buffer));
+ }
+
+ public static SaslHandshakeRequest parse(ByteBuffer buffer) {
+ return new SaslHandshakeRequest(CURRENT_SCHEMA.read(buffer));
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
new file mode 100644
index 0000000..c0fc495
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+
+/**
+ * Response from SASL server which indicates if the client-chosen mechanism is enabled in the server.
+ * For error responses, the list of enabled mechanisms is included in the response.
+ */
+public class SaslHandshakeResponse extends AbstractRequestResponse {
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SASL_HANDSHAKE.id);
+
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String ENABLED_MECHANISMS_KEY_NAME = "enabled_mechanisms";
+
+ /**
+ * Possible error codes:
+ * UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server
+ * ILLEGAL_SASL_STATE(34) : Invalid request during SASL handshake
+ */
+ private final short errorCode;
+ private final List<String> enabledMechanisms;
+
+ public SaslHandshakeResponse(short errorCode, Collection<String> enabledMechanisms) {
+ super(new Struct(CURRENT_SCHEMA));
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+ struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray());
+ this.errorCode = errorCode;
+ this.enabledMechanisms = new ArrayList<>(enabledMechanisms);
+ }
+
+ public SaslHandshakeResponse(Struct struct) {
+ super(struct);
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ Object[] mechanisms = struct.getArray(ENABLED_MECHANISMS_KEY_NAME);
+ ArrayList<String> enabledMechanisms = new ArrayList<>();
+ for (Object mechanism : mechanisms)
+ enabledMechanisms.add((String) mechanism);
+ this.enabledMechanisms = enabledMechanisms;
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public List<String> enabledMechanisms() {
+ return enabledMechanisms;
+ }
+
+ public static SaslHandshakeResponse parse(ByteBuffer buffer) {
+ return new SaslHandshakeResponse(CURRENT_SCHEMA.read(buffer));
+ }
+
+ public static SaslHandshakeResponse parse(ByteBuffer buffer, int version) {
+ return new SaslHandshakeResponse(ProtoUtils.parseResponse(ApiKeys.SASL_HANDSHAKE.id, version, buffer));
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java
new file mode 100644
index 0000000..ed2c087
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+import java.util.Map;
+
+import org.apache.kafka.common.network.Mode;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+
+/*
+ * Callback handler for SASL-based authentication
+ */
+public interface AuthCallbackHandler extends CallbackHandler {
+
+ /**
+ * Configures this callback handler.
+ *
+ * @param configs Configuration
+ * @param mode The mode that indicates if this is a client or server connection
+ * @param subject Subject from login context
+ * @param saslMechanism Negotiated SASL mechanism
+ */
+ void configure(Map<String, ?> configs, Mode mode, Subject subject, String saslMechanism);
+
+ /**
+ * Closes this instance.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
new file mode 100644
index 0000000..1ac779d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.auth;
+
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+/**
+ * Login interface for authentication.
+ */
+public interface Login {
+
+ /**
+ * Configures this login instance.
+ */
+ void configure(Map<String, ?> configs, String loginContextName);
+
+ /**
+ * Performs login for each login module specified for the login context of this instance.
+ */
+ LoginContext login() throws LoginException;
+
+ /**
+ * Returns the authenticated subject of this login context.
+ */
+ Subject subject();
+
+ /**
+ * Returns the service name to be used for SASL.
+ */
+ String serviceName();
+
+ /**
+ * Closes this instance.
+ */
+ void close();
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
new file mode 100644
index 0000000..2fe43ab
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.authenticator;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.RealmCallback;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.Subject;
+
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.Login;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Base login class that implements methods common to typical SASL mechanisms.
+ */
+public abstract class AbstractLogin implements Login {
+ private static final Logger log = LoggerFactory.getLogger(AbstractLogin.class);
+
+ private String loginContextName;
+ private LoginContext loginContext;
+
+
+ @Override
+ public void configure(Map<String, ?> configs, String loginContextName) {
+ this.loginContextName = loginContextName;
+ }
+
+ @Override
+ public LoginContext login() throws LoginException {
+ String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
+ if (jaasConfigFile == null) {
+ log.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is not set, using default JAAS configuration.");
+ }
+ AppConfigurationEntry[] configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+ if (configEntries == null) {
+ String errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
+ JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
+ throw new IllegalArgumentException(errorMessage);
+ }
+
+ loginContext = new LoginContext(loginContextName, new LoginCallbackHandler());
+ loginContext.login();
+ log.info("Successfully logged in.");
+ return loginContext;
+ }
+
+ @Override
+ public Subject subject() {
+ return loginContext.getSubject();
+ }
+
+ /**
+ * Callback handler for creating login context. Login callback handlers
+ * should support the callbacks required for the login modules used by
+ * the KafkaServer and KafkaClient contexts. Kafka does not support
+ * callback handlers which require additional user input.
+ *
+ */
+ public static class LoginCallbackHandler implements CallbackHandler {
+
+ @Override
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ NameCallback nc = (NameCallback) callback;
+ nc.setName(nc.getDefaultName());
+ } else if (callback instanceof PasswordCallback) {
+ String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" +
+ " client code does not currently support obtaining a password from the user.";
+ throw new UnsupportedCallbackException(callback, errorMessage);
+ } else if (callback instanceof RealmCallback) {
+ RealmCallback rc = (RealmCallback) callback;
+ rc.setText(rc.getDefaultText());
+ } else {
+ throw new UnsupportedCallbackException(callback, "Unrecognized SASL Login callback");
+ }
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java
new file mode 100644
index 0000000..0a405bc
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.authenticator;
+
+public class DefaultLogin extends AbstractLogin {
+
+ @Override
+ public String serviceName() {
+ return "kafka";
+ }
+
+ @Override
+ public void close() {
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
new file mode 100644
index 0000000..9aec9a7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.authenticator;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.apache.kafka.common.network.LoginType;
+import org.apache.kafka.common.security.auth.Login;
+import org.apache.kafka.common.security.kerberos.KerberosLogin;
+
+public class LoginManager {
+
+ private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap<>(LoginType.class);
+
+ private final Login login;
+ private final LoginType loginType;
+ private int refCount;
+
+ private LoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs) throws IOException, LoginException {
+ this.loginType = loginType;
+ String loginContext = loginType.contextName();
+ login = hasKerberos ? new KerberosLogin() : new DefaultLogin();
+ login.configure(configs, loginContext);
+ login.login();
+ }
+
+ /**
+ * Returns an instance of `LoginManager` and increases its reference count.
+ *
+ * `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an
+ * existing `LoginManager` for the provided `mode` if available. However, it expects `configs` to be the same for
+ * every invocation and it will ignore them in the case where it's returning a cached instance of `LoginManager`.
+ *
+ * This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and
+ * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more
+ * complicated to do the latter without making the consumer API more complex.
+ *
+ * @param loginType the type of the login context, it should be SERVER for the broker and CLIENT for the clients
+ * (i.e. consumer and producer)
+ * @param configs configuration as key/value pairs
+ */
+ public static final LoginManager acquireLoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs) throws IOException, LoginException {
+ synchronized (LoginManager.class) {
+ LoginManager loginManager = CACHED_INSTANCES.get(loginType);
+ if (loginManager == null) {
+ loginManager = new LoginManager(loginType, hasKerberos, configs);
+ CACHED_INSTANCES.put(loginType, loginManager);
+ }
+ return loginManager.acquire();
+ }
+ }
+
+ public Subject subject() {
+ return login.subject();
+ }
+
+ public String serviceName() {
+ return login.serviceName();
+ }
+
+ private LoginManager acquire() {
+ ++refCount;
+ return this;
+ }
+
+ /**
+ * Decrease the reference count for this instance and release resources if it reaches 0.
+ */
+ public void release() {
+ synchronized (LoginManager.class) {
+ if (refCount == 0)
+ throw new IllegalStateException("release called on LoginManager with refCount == 0");
+ else if (refCount == 1) {
+ CACHED_INSTANCES.remove(loginType);
+ login.close();
+ }
+ --refCount;
+ }
+ }
+
+ /* Should only be used in tests. */
+ public static void closeAll() {
+ synchronized (LoginManager.class) {
+ for (LoginType loginType : new ArrayList<>(CACHED_INSTANCES.keySet())) {
+ LoginManager loginManager = CACHED_INSTANCES.remove(loginType);
+ loginManager.login.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 370e729..ba201dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -23,38 +23,44 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Arrays;
import java.util.Map;
-
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.SaslHandshakeRequest;
+import org.apache.kafka.common.requests.SaslHandshakeResponse;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SaslClientAuthenticator implements Authenticator {
public enum SaslState {
- INITIAL, INTERMEDIATE, COMPLETE, FAILED
+ SEND_HANDSHAKE_REQUEST, RECEIVE_HANDSHAKE_RESPONSE, INITIAL, INTERMEDIATE, COMPLETE, FAILED
}
private static final Logger LOG = LoggerFactory.getLogger(SaslClientAuthenticator.class);
@@ -63,33 +69,57 @@ public class SaslClientAuthenticator implements Authenticator {
private final String servicePrincipal;
private final String host;
private final String node;
+ private final String mechanism;
+ private final boolean handshakeRequestEnable;
// assigned in `configure`
private SaslClient saslClient;
+ private Map<String, ?> configs;
private String clientPrincipalName;
+ private AuthCallbackHandler callbackHandler;
private TransportLayer transportLayer;
// buffers used in `authenticate`
private NetworkReceive netInBuffer;
private NetworkSend netOutBuffer;
- private SaslState saslState = SaslState.INITIAL;
+ // Current SASL state
+ private SaslState saslState;
+ // Next SASL state to be set when outgoing writes associated with the current SASL state complete
+ private SaslState pendingSaslState;
+ // Correlation ID for the next request
+ private int correlationId;
+ // Request header for which response from the server is pending
+ private RequestHeader currentRequestHeader;
- public SaslClientAuthenticator(String node, Subject subject, String servicePrincipal, String host) throws IOException {
+ public SaslClientAuthenticator(String node, Subject subject, String servicePrincipal, String host, String mechanism, boolean handshakeRequestEnable) throws IOException {
this.node = node;
this.subject = subject;
this.host = host;
this.servicePrincipal = servicePrincipal;
+ this.mechanism = mechanism;
+ this.handshakeRequestEnable = handshakeRequestEnable;
+ this.correlationId = -1;
}
public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) throws KafkaException {
try {
this.transportLayer = transportLayer;
+ this.configs = configs;
+
+ setSaslState(handshakeRequestEnable ? SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL);
// determine client principal from subject.
- Principal clientPrincipal = subject.getPrincipals().iterator().next();
- this.clientPrincipalName = clientPrincipal.getName();
- this.saslClient = createSaslClient();
+ if (!subject.getPrincipals().isEmpty()) {
+ Principal clientPrincipal = subject.getPrincipals().iterator().next();
+ this.clientPrincipalName = clientPrincipal.getName();
+ } else {
+ clientPrincipalName = null;
+ }
+ callbackHandler = new SaslClientCallbackHandler();
+ callbackHandler.configure(configs, Mode.CLIENT, subject, mechanism);
+
+ saslClient = createSaslClient();
} catch (Exception e) {
throw new KafkaException("Failed to configure SaslClientAuthenticator", e);
}
@@ -99,15 +129,14 @@ public class SaslClientAuthenticator implements Authenticator {
try {
return Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
public SaslClient run() throws SaslException {
- String[] mechs = {"GSSAPI"};
+ String[] mechs = {mechanism};
LOG.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}",
clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs));
- return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, null,
- new ClientCallbackHandler());
+ return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, configs, callbackHandler);
}
});
} catch (PrivilegedActionException e) {
- throw new KafkaException("Failed to create SaslClient", e.getCause());
+ throw new KafkaException("Failed to create SaslClient with mechanism " + mechanism, e.getCause());
}
}
@@ -123,22 +152,39 @@ public class SaslClientAuthenticator implements Authenticator {
return;
switch (saslState) {
+ case SEND_HANDSHAKE_REQUEST:
+ String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
+ currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, clientId, correlationId++);
+ SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest(mechanism);
+ send(RequestSend.serialize(currentRequestHeader, handshakeRequest.toStruct()));
+ setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
+ break;
+ case RECEIVE_HANDSHAKE_RESPONSE:
+ byte[] responseBytes = receiveResponseOrToken();
+ if (responseBytes == null)
+ break;
+ else {
+ try {
+ handleKafkaResponse(currentRequestHeader, responseBytes);
+ currentRequestHeader = null;
+ } catch (Exception e) {
+ setSaslState(SaslState.FAILED);
+ throw e;
+ }
+ setSaslState(SaslState.INITIAL);
+ // Fall through and start SASL authentication using the configured client mechanism
+ }
case INITIAL:
- sendSaslToken(new byte[0]);
- saslState = SaslState.INTERMEDIATE;
+ sendSaslToken(new byte[0], true);
+ setSaslState(SaslState.INTERMEDIATE);
break;
case INTERMEDIATE:
- if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
- netInBuffer.readFrom(transportLayer);
- if (netInBuffer.complete()) {
- netInBuffer.payload().rewind();
- byte[] serverToken = new byte[netInBuffer.payload().remaining()];
- netInBuffer.payload().get(serverToken, 0, serverToken.length);
- netInBuffer = null; // reset the networkReceive as we read all the data.
- sendSaslToken(serverToken);
+ byte[] serverToken = receiveResponseOrToken();
+ if (serverToken != null) {
+ sendSaslToken(serverToken, false);
}
if (saslClient.isComplete()) {
- saslState = SaslState.COMPLETE;
+ setSaslState(SaslState.COMPLETE);
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
}
break;
@@ -149,30 +195,58 @@ public class SaslClientAuthenticator implements Authenticator {
}
}
- private void sendSaslToken(byte[] serverToken) throws IOException {
+ private void setSaslState(SaslState saslState) {
+ if (netOutBuffer != null && !netOutBuffer.completed())
+ pendingSaslState = saslState;
+ else {
+ this.pendingSaslState = null;
+ this.saslState = saslState;
+ LOG.debug("Set SASL client state to {}", saslState);
+ }
+ }
+
+ private void sendSaslToken(byte[] serverToken, boolean isInitial) throws IOException {
if (!saslClient.isComplete()) {
- try {
- byte[] saslToken = createSaslToken(serverToken);
- if (saslToken != null) {
- netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(saslToken));
- flushNetOutBufferAndUpdateInterestOps();
- }
- } catch (IOException e) {
- saslState = SaslState.FAILED;
- throw e;
- }
+ byte[] saslToken = createSaslToken(serverToken, isInitial);
+ if (saslToken != null)
+ send(ByteBuffer.wrap(saslToken));
+ }
+ }
+
+ private void send(ByteBuffer buffer) throws IOException {
+ try {
+ netOutBuffer = new NetworkSend(node, buffer);
+ flushNetOutBufferAndUpdateInterestOps();
+ } catch (IOException e) {
+ setSaslState(SaslState.FAILED);
+ throw e;
}
}
private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
boolean flushedCompletely = flushNetOutBuffer();
- if (flushedCompletely)
+ if (flushedCompletely) {
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
- else
+ if (pendingSaslState != null)
+ setSaslState(pendingSaslState);
+ } else
transportLayer.addInterestOps(SelectionKey.OP_WRITE);
return flushedCompletely;
}
+ private byte[] receiveResponseOrToken() throws IOException {
+ if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
+ netInBuffer.readFrom(transportLayer);
+ byte[] serverPacket = null;
+ if (netInBuffer.complete()) {
+ netInBuffer.payload().rewind();
+ serverPacket = new byte[netInBuffer.payload().remaining()];
+ netInBuffer.payload().get(serverPacket, 0, serverPacket.length);
+ netInBuffer = null; // reset the networkReceive as we read all the data.
+ }
+ return serverPacket;
+ }
+
public Principal principal() {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, clientPrincipalName);
}
@@ -182,19 +256,25 @@ public class SaslClientAuthenticator implements Authenticator {
}
public void close() throws IOException {
- saslClient.dispose();
+ if (saslClient != null)
+ saslClient.dispose();
+ if (callbackHandler != null)
+ callbackHandler.close();
}
- private byte[] createSaslToken(final byte[] saslToken) throws SaslException {
+ private byte[] createSaslToken(final byte[] saslToken, boolean isInitial) throws SaslException {
if (saslToken == null)
throw new SaslException("Error authenticating with the Kafka Broker: received a `null` saslToken.");
try {
- return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
- public byte[] run() throws SaslException {
- return saslClient.evaluateChallenge(saslToken);
- }
- });
+ if (isInitial && !saslClient.hasInitialResponse())
+ return saslToken;
+ else
+ return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+ public byte[] run() throws SaslException {
+ return saslClient.evaluateChallenge(saslToken);
+ }
+ });
} catch (PrivilegedActionException e) {
String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
// Try to provide hints to use about what went wrong so they can fix their configuration.
@@ -221,35 +301,39 @@ public class SaslClientAuthenticator implements Authenticator {
return netOutBuffer.completed();
}
- public static class ClientCallbackHandler implements CallbackHandler {
-
- public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
- for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- NameCallback nc = (NameCallback) callback;
- nc.setName(nc.getDefaultName());
- } else if (callback instanceof PasswordCallback) {
- // Call `setPassword` once we support obtaining a password from the user and update message below
- throw new UnsupportedCallbackException(callback, "Could not login: the client is being asked for a password, but the Kafka" +
- " client code does not currently support obtaining a password from the user." +
- " Make sure -Djava.security.auth.login.config property passed to JVM and" +
- " the client is configured to use a ticket cache (using" +
- " the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using" +
- " FQDN of the Kafka broker you are trying to connect to.");
- } else if (callback instanceof RealmCallback) {
- RealmCallback rc = (RealmCallback) callback;
- rc.setText(rc.getDefaultText());
- } else if (callback instanceof AuthorizeCallback) {
- AuthorizeCallback ac = (AuthorizeCallback) callback;
- String authId = ac.getAuthenticationID();
- String authzId = ac.getAuthorizationID();
- ac.setAuthorized(authId.equals(authzId));
- if (ac.isAuthorized())
- ac.setAuthorizedID(authzId);
- } else {
- throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
- }
- }
+ private void handleKafkaResponse(RequestHeader requestHeader, byte[] responseBytes) {
+ Struct struct;
+ ApiKeys apiKey;
+ try {
+ struct = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader);
+ apiKey = ApiKeys.forId(requestHeader.apiKey());
+ } catch (SchemaException | IllegalArgumentException e) {
+ LOG.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens");
+ throw new AuthenticationException("Invalid SASL mechanism response", e);
+ }
+ switch (apiKey) {
+ case SASL_HANDSHAKE:
+ handleSaslHandshakeResponse(new SaslHandshakeResponse(struct));
+ break;
+ default:
+ throw new IllegalStateException("Unexpected API key during handshake: " + apiKey);
+ }
+ }
+
+ private void handleSaslHandshakeResponse(SaslHandshakeResponse response) {
+ Errors error = Errors.forCode(response.errorCode());
+ switch (error) {
+ case NONE:
+ break;
+ case UNSUPPORTED_SASL_MECHANISM:
+ throw new UnsupportedSaslMechanismException(String.format("Client SASL mechanism '%s' not enabled in the server, enabled mechanisms are %s",
+ mechanism, response.enabledMechanisms()));
+ case ILLEGAL_SASL_STATE:
+ throw new IllegalSaslStateException(String.format("Unexpected handshake request with client mechanism %s, enabled mechanisms are %s",
+ mechanism, response.enabledMechanisms()));
+ default:
+ throw new AuthenticationException(String.format("Unknown error code %d, client mechanism is %s, enabled mechanisms are %s",
+ response.errorCode(), mechanism, response.enabledMechanisms()));
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
new file mode 100644
index 0000000..8e0b8db
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+
+/**
+ * Callback handler for Sasl clients. The callbacks required for the SASL mechanism
+ * configured for the client should be supported by this callback handler. See
+ * <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/sasl/sasl-refguide.html">Java SASL API</a>
+ * for the list of SASL callback handlers required for each SASL mechanism.
+ */
+public class SaslClientCallbackHandler implements AuthCallbackHandler {
+
+ private boolean isKerberos;
+ private Subject subject;
+
+ @Override
+ public void configure(Map<String, ?> configs, Mode mode, Subject subject, String mechanism) {
+ this.isKerberos = mechanism.equals(SaslConfigs.GSSAPI_MECHANISM);
+ this.subject = subject;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ NameCallback nc = (NameCallback) callback;
+ if (!isKerberos && subject != null && !subject.getPublicCredentials(String.class).isEmpty()) {
+ nc.setName(subject.getPublicCredentials(String.class).iterator().next());
+ } else
+ nc.setName(nc.getDefaultName());
+ } else if (callback instanceof PasswordCallback) {
+ if (!isKerberos && subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) {
+ char [] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
+ ((PasswordCallback) callback).setPassword(password);
+ } else {
+ String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" +
+ " client code does not currently support obtaining a password from the user.";
+ if (isKerberos) {
+ errorMessage += " Make sure -Djava.security.auth.login.config property passed to JVM and" +
+ " the client is configured to use a ticket cache (using" +
+ " the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using" +
+ " FQDN of the Kafka broker you are trying to connect to.";
+ }
+ throw new UnsupportedCallbackException(callback, errorMessage);
+ }
+ } else if (callback instanceof RealmCallback) {
+ RealmCallback rc = (RealmCallback) callback;
+ rc.setText(rc.getDefaultText());
+ } else if (callback instanceof AuthorizeCallback) {
+ AuthorizeCallback ac = (AuthorizeCallback) callback;
+ String authId = ac.getAuthenticationID();
+ String authzId = ac.getAuthorizationID();
+ ac.setAuthorized(authId.equals(authzId));
+ if (ac.isAuthorized())
+ ac.setAuthorizedID(authzId);
+ } else {
+ throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
\ No newline at end of file