You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/10/04 17:45:11 UTC
kafka git commit: KAFKA-6004;
Allow authentication providers to override error message
Repository: kafka
Updated Branches:
refs/heads/trunk 5afeddaa9 -> 9949e1ed1
KAFKA-6004; Allow authentication providers to override error message
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #4015 from rajinisivaram/KAFKA-6004-auth-exception
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9949e1ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9949e1ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9949e1ed
Branch: refs/heads/trunk
Commit: 9949e1ed1be8293ba60dcab10345fbcce8d61e36
Parents: 5afedda
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Oct 4 13:44:46 2017 -0400
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Wed Oct 4 13:44:46 2017 -0400
----------------------------------------------------------------------
.../errors/SaslAuthenticationException.java | 8 ++++++
.../authenticator/SaslServerAuthenticator.java | 7 +++--
.../common/security/plain/PlainSaslServer.java | 17 +++++++++--
.../common/security/scram/ScramSaslServer.java | 14 +++++++--
.../kafka/common/network/NetworkTestUtils.java | 7 +++--
.../authenticator/SaslAuthenticatorTest.java | 30 +++++++++++++++-----
.../security/plain/PlainSaslServerTest.java | 5 ++--
.../security/scram/ScramSaslServerTest.java | 5 ++--
docs/upgrade.html | 3 ++
9 files changed, 74 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
index d128c25..c6bc8bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.errors;
+import javax.security.sasl.SaslServer;
+
/**
* This exception indicates that SASL authentication has failed. The error message
* in the exception indicates the actual cause of failure.
@@ -24,6 +26,12 @@ package org.apache.kafka.common.errors;
* could also include other failures specific to the SASL mechanism used
* for authentication.
* </p>
+ * <p><b>Note:</b>If {@link SaslServer#evaluateResponse(byte[])} throws this exception during
+ * authentication, the message from the exception will be sent to clients in the SaslAuthenticate
+ * response. Custom {@link SaslServer} implementations may throw this exception in order to
+ * provide custom error messages to clients, but should take care not to include any
+ * security-critical information in the message that should not be leaked to unauthenticated clients.
+ * </p>
*/
public class SaslAuthenticationException extends AuthenticationException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 355e365..4e565d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.Authenticator;
@@ -380,9 +381,11 @@ public class SaslServerAuthenticator implements Authenticator {
// For versions with SASL_AUTHENTICATE header, send a response to SASL_AUTHENTICATE request even if token is empty.
ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken);
sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, responseBuf));
- } catch (SaslException e) {
+ } catch (SaslAuthenticationException | SaslException e) {
+ String errorMessage = e instanceof SaslAuthenticationException ? e.getMessage() :
+ "Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism;
sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED,
- "Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism));
+ errorMessage));
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
index 93faee0..e54887f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
@@ -26,6 +26,7 @@ import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslServerFactory;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler;
@@ -56,8 +57,18 @@ public class PlainSaslServer implements SaslServer {
this.jaasContext = jaasContext;
}
+ /**
+ * @throws SaslAuthenticationException if username/password combination is invalid or if the requested
+ * authorization id is not the same as username.
+ * <p>
+ * <b>Note:</b> This method may throw {@link SaslAuthenticationException} to provide custom error messages
+ * to clients. But care should be taken to avoid including any information in the exception message that
+ * should not be leaked to unauthenticated clients. It may be safer to throw {@link SaslException} in
+ * some cases so that a standard error message is returned to clients.
+ * </p>
+ */
@Override
- public byte[] evaluateResponse(byte[] response) throws SaslException {
+ public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthenticationException {
/*
* Message format (from https://tools.ietf.org/html/rfc4616):
*
@@ -93,11 +104,11 @@ public class PlainSaslServer implements SaslServer {
String expectedPassword = jaasContext.configEntryOption(JAAS_USER_PREFIX + username,
PlainLoginModule.class.getName());
if (!password.equals(expectedPassword)) {
- throw new SaslException("Authentication failed: Invalid username or password");
+ throw new SaslAuthenticationException("Authentication failed: Invalid username or password");
}
if (!authorizationIdFromClient.isEmpty() && !authorizationIdFromClient.equals(username))
- throw new SaslException("Authentication failed: Client requested an authorization id that is different from username");
+ throw new SaslAuthenticationException("Authentication failed: Client requested an authorization id that is different from username");
this.authorizationId = username;
http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
index 04b9afa..b250e15 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
@@ -32,6 +32,7 @@ import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslServerFactory;
import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage;
import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage;
import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage;
@@ -73,8 +74,17 @@ public class ScramSaslServer implements SaslServer {
setState(State.RECEIVE_CLIENT_FIRST_MESSAGE);
}
+ /**
+ * @throws SaslAuthenticationException if the requested authorization id is not the same as username.
+ * <p>
+ * <b>Note:</b> This method may throw {@link SaslAuthenticationException} to provide custom error messages
+ * to clients. But care should be taken to avoid including any information in the exception message that
+ * should not be leaked to unauthenticated clients. It may be safer to throw {@link SaslException} in
+ * most cases so that a standard error message is returned to clients.
+ * </p>
+ */
@Override
- public byte[] evaluateResponse(byte[] response) throws SaslException {
+ public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthenticationException {
try {
switch (state) {
case RECEIVE_CLIENT_FIRST_MESSAGE:
@@ -91,7 +101,7 @@ public class ScramSaslServer implements SaslServer {
throw new SaslException("Authentication failed: Invalid user credentials");
String authorizationIdFromClient = clientFirstMessage.authorizationId();
if (!authorizationIdFromClient.isEmpty() && !authorizationIdFromClient.equals(username))
- throw new SaslException("Authentication failed: Client requested an authorization id that is different from username");
+ throw new SaslAuthenticationException("Authentication failed: Client requested an authorization id that is different from username");
if (scramCredential.iterations() < mechanism.minIterations())
throw new SaslException("Iterations " + scramCredential.iterations() + " is less than the minimum " + mechanism.minIterations() + " for " + mechanism);
http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index 9518315..5998049 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -79,7 +79,8 @@ public class NetworkTestUtils {
assertTrue(selector.isChannelReady(node));
}
- public static void waitForChannelClose(Selector selector, String node, ChannelState.State channelState) throws IOException {
+ public static ChannelState waitForChannelClose(Selector selector, String node, ChannelState.State channelState)
+ throws IOException {
boolean closed = false;
for (int i = 0; i < 30; i++) {
selector.poll(1000L);
@@ -89,6 +90,8 @@ public class NetworkTestUtils {
}
}
assertTrue("Channel was not closed by timeout", closed);
- assertEquals(channelState, selector.disconnected().get(node).state());
+ ChannelState finalState = selector.disconnected().get(node);
+ assertEquals(channelState, finalState.state());
+ return finalState;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 3b9e32b..c442672 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
@@ -154,7 +155,8 @@ public class SaslAuthenticatorTest {
jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, "invalidpassword");
server = createEchoServer(securityProtocol);
- createAndCheckClientConnectionFailure(securityProtocol, node);
+ createAndCheckClientAuthenticationFailure(securityProtocol, node, "PLAIN",
+ "Authentication failed: Invalid username or password");
server.verifyAuthenticationMetrics(0, 1);
}
@@ -169,7 +171,8 @@ public class SaslAuthenticatorTest {
jaasConfig.setClientOptions("PLAIN", "invaliduser", TestJaasConfig.PASSWORD);
server = createEchoServer(securityProtocol);
- createAndCheckClientConnectionFailure(securityProtocol, node);
+ createAndCheckClientAuthenticationFailure(securityProtocol, node, "PLAIN",
+ "Authentication failed: Invalid username or password");
server.verifyAuthenticationMetrics(0, 1);
}
@@ -307,7 +310,7 @@ public class SaslAuthenticatorTest {
String node = "0";
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
- createAndCheckClientConnectionFailure(securityProtocol, node);
+ createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
server.verifyAuthenticationMetrics(0, 1);
}
@@ -326,7 +329,7 @@ public class SaslAuthenticatorTest {
String node = "0";
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
- createAndCheckClientConnectionFailure(securityProtocol, node);
+ createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
server.verifyAuthenticationMetrics(0, 1);
}
@@ -344,7 +347,7 @@ public class SaslAuthenticatorTest {
server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove(TestJaasConfig.USERNAME);
String node = "1";
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
- createAndCheckClientConnectionFailure(securityProtocol, node);
+ createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
server.verifyAuthenticationMetrics(0, 1);
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
@@ -1136,11 +1139,24 @@ public class SaslAuthenticatorTest {
selector = null;
}
- private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node) throws Exception {
+ private void createAndCheckClientAuthenticationFailure(SecurityProtocol securityProtocol, String node,
+ String mechanism, String expectedErrorMessage) throws Exception {
+ ChannelState finalState = createAndCheckClientConnectionFailure(securityProtocol, node);
+ Exception exception = finalState.exception();
+ assertTrue("Invalid exception class " + exception.getClass(), exception instanceof SaslAuthenticationException);
+ if (expectedErrorMessage == null)
+ expectedErrorMessage = "Authentication failed due to invalid credentials with SASL mechanism " + mechanism;
+ assertEquals(expectedErrorMessage, exception.getMessage());
+ }
+
+ private ChannelState createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node)
+ throws Exception {
createClientConnection(securityProtocol, node);
- NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+ ChannelState finalState = NetworkTestUtils.waitForChannelClose(selector, node,
+ ChannelState.State.AUTHENTICATION_FAILED);
selector.close();
selector = null;
+ return finalState;
}
private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
index 5ca778b..4196db6 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
@@ -23,10 +23,9 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
-import javax.security.sasl.SaslException;
-
import static org.junit.Assert.assertEquals;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
@@ -62,7 +61,7 @@ public class PlainSaslServerTest {
assertEquals(0, nextChallenge.length);
}
- @Test(expected = SaslException.class)
+ @Test(expected = SaslAuthenticationException.class)
public void authorizatonIdNotEqualsAuthenticationId() throws Exception {
saslServer.evaluateResponse(saslMessage(USER_B, USER_A, PASSWORD_A));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
index 9b329d0..fd7f988 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
@@ -22,10 +22,9 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
-import javax.security.sasl.SaslException;
-
import static org.junit.Assert.assertTrue;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.security.authenticator.CredentialCache;
public class ScramSaslServerTest {
@@ -60,7 +59,7 @@ public class ScramSaslServerTest {
assertTrue("Next challenge is empty", nextChallenge.length > 0);
}
- @Test(expected = SaslException.class)
+ @Test(expected = SaslAuthenticationException.class)
public void authorizatonIdNotEqualsAuthenticationId() throws Exception {
saslServer.evaluateResponse(clientFirstMessage(USER_A, USER_B));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 7162876..5872c7c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -70,6 +70,9 @@
<li>SimpleAclAuthorizer now logs access denials to the authorizer log by default.</li>
<li>Authentication failures are now reported to clients as one of the subclasses of <code>AuthenticationException</code>.
No retries will be performed if a client connection fails authentication.</li>
+ <li>Custom <code>SaslServer</code> implementations may throw <code>SaslAuthenticationException</code> to provide an error
+ message to return to clients indicating the reason for authentication failure. Implementors should take care not to include
+ any security-critical information in the exception message that should not be leaked to unauthenticated clients.</li>
<li>The <code>app-info</code> mbean registered with JMX to provide version and commit id will be deprecated and replaced with
metrics providing these attributes.</li>
<li>Kafka metrics may now contain non-numeric values. <code>org.apache.kafka.common.Metric#value()</code> has been deprecated and