You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/10/04 17:45:11 UTC

kafka git commit: KAFKA-6004; Allow authentication providers to override error message

Repository: kafka
Updated Branches:
  refs/heads/trunk 5afeddaa9 -> 9949e1ed1


KAFKA-6004; Allow authentication providers to override error message

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #4015 from rajinisivaram/KAFKA-6004-auth-exception


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9949e1ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9949e1ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9949e1ed

Branch: refs/heads/trunk
Commit: 9949e1ed1be8293ba60dcab10345fbcce8d61e36
Parents: 5afedda
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Oct 4 13:44:46 2017 -0400
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Wed Oct 4 13:44:46 2017 -0400

----------------------------------------------------------------------
 .../errors/SaslAuthenticationException.java     |  8 ++++++
 .../authenticator/SaslServerAuthenticator.java  |  7 +++--
 .../common/security/plain/PlainSaslServer.java  | 17 +++++++++--
 .../common/security/scram/ScramSaslServer.java  | 14 +++++++--
 .../kafka/common/network/NetworkTestUtils.java  |  7 +++--
 .../authenticator/SaslAuthenticatorTest.java    | 30 +++++++++++++++-----
 .../security/plain/PlainSaslServerTest.java     |  5 ++--
 .../security/scram/ScramSaslServerTest.java     |  5 ++--
 docs/upgrade.html                               |  3 ++
 9 files changed, 74 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
index d128c25..c6bc8bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/SaslAuthenticationException.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.errors;
 
+import javax.security.sasl.SaslServer;
+
 /**
  * This exception indicates that SASL authentication has failed. The error message
  * in the exception indicates the actual cause of failure.
@@ -24,6 +26,12 @@ package org.apache.kafka.common.errors;
  * could also include other failures specific to the SASL mechanism used
  * for authentication.
  * </p>
+ * <p><b>Note:</b>If {@link SaslServer#evaluateResponse(byte[])} throws this exception during
+ * authentication, the message from the exception will be sent to clients in the SaslAuthenticate
+ * response. Custom {@link SaslServer} implementations may throw this exception in order to
+ * provide custom error messages to clients, but should take care not to include any
+ * security-critical information in the message that should not be leaked to unauthenticated clients.
+ * </p>
  */
 public class SaslAuthenticationException extends AuthenticationException {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 355e365..4e565d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.Authenticator;
@@ -380,9 +381,11 @@ public class SaslServerAuthenticator implements Authenticator {
                 // For versions with SASL_AUTHENTICATE header, send a response to SASL_AUTHENTICATE request even if token is empty.
                 ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken);
                 sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, responseBuf));
-            } catch (SaslException e) {
+            } catch (SaslAuthenticationException | SaslException e) {
+                String errorMessage = e instanceof SaslAuthenticationException ? e.getMessage() :
+                    "Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism;
                 sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED,
-                        "Authentication failed due to invalid credentials with SASL mechanism " + saslMechanism));
+                        errorMessage));
                 throw e;
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
index 93faee0..e54887f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
@@ -26,6 +26,7 @@ import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 import javax.security.sasl.SaslServerFactory;
 
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler;
 
@@ -56,8 +57,18 @@ public class PlainSaslServer implements SaslServer {
         this.jaasContext = jaasContext;
     }
 
+    /**
+     * @throws SaslAuthenticationException if username/password combination is invalid or if the requested
+     *         authorization id is not the same as username.
+     * <p>
+     * <b>Note:</b> This method may throw {@link SaslAuthenticationException} to provide custom error messages
+     * to clients. But care should be taken to avoid including any information in the exception message that
+     * should not be leaked to unauthenticated clients. It may be safer to throw {@link SaslException} in
+     * some cases so that a standard error message is returned to clients.
+     * </p>
+     */
     @Override
-    public byte[] evaluateResponse(byte[] response) throws SaslException {
+    public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthenticationException {
         /*
          * Message format (from https://tools.ietf.org/html/rfc4616):
          *
@@ -93,11 +104,11 @@ public class PlainSaslServer implements SaslServer {
         String expectedPassword = jaasContext.configEntryOption(JAAS_USER_PREFIX + username,
                 PlainLoginModule.class.getName());
         if (!password.equals(expectedPassword)) {
-            throw new SaslException("Authentication failed: Invalid username or password");
+            throw new SaslAuthenticationException("Authentication failed: Invalid username or password");
         }
 
         if (!authorizationIdFromClient.isEmpty() && !authorizationIdFromClient.equals(username))
-            throw new SaslException("Authentication failed: Client requested an authorization id that is different from username");
+            throw new SaslAuthenticationException("Authentication failed: Client requested an authorization id that is different from username");
 
         this.authorizationId = username;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
index 04b9afa..b250e15 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
@@ -32,6 +32,7 @@ import javax.security.sasl.SaslServer;
 import javax.security.sasl.SaslServerFactory;
 
 import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage;
 import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage;
 import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage;
@@ -73,8 +74,17 @@ public class ScramSaslServer implements SaslServer {
         setState(State.RECEIVE_CLIENT_FIRST_MESSAGE);
     }
 
+    /**
+     * @throws SaslAuthenticationException if the requested authorization id is not the same as username.
+     * <p>
+     * <b>Note:</b> This method may throw {@link SaslAuthenticationException} to provide custom error messages
+     * to clients. But care should be taken to avoid including any information in the exception message that
+     * should not be leaked to unauthenticated clients. It may be safer to throw {@link SaslException} in
+     * most cases so that a standard error message is returned to clients.
+     * </p>
+     */
     @Override
-    public byte[] evaluateResponse(byte[] response) throws SaslException {
+    public byte[] evaluateResponse(byte[] response) throws SaslException, SaslAuthenticationException {
         try {
             switch (state) {
                 case RECEIVE_CLIENT_FIRST_MESSAGE:
@@ -91,7 +101,7 @@ public class ScramSaslServer implements SaslServer {
                             throw new SaslException("Authentication failed: Invalid user credentials");
                         String authorizationIdFromClient = clientFirstMessage.authorizationId();
                         if (!authorizationIdFromClient.isEmpty() && !authorizationIdFromClient.equals(username))
-                            throw new SaslException("Authentication failed: Client requested an authorization id that is different from username");
+                            throw new SaslAuthenticationException("Authentication failed: Client requested an authorization id that is different from username");
 
                         if (scramCredential.iterations() < mechanism.minIterations())
                             throw new SaslException("Iterations " + scramCredential.iterations() +  " is less than the minimum " + mechanism.minIterations() + " for " + mechanism);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index 9518315..5998049 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -79,7 +79,8 @@ public class NetworkTestUtils {
         assertTrue(selector.isChannelReady(node));
     }
 
-    public static void waitForChannelClose(Selector selector, String node, ChannelState.State channelState) throws IOException {
+    public static ChannelState waitForChannelClose(Selector selector, String node, ChannelState.State channelState)
+            throws IOException {
         boolean closed = false;
         for (int i = 0; i < 30; i++) {
             selector.poll(1000L);
@@ -89,6 +90,8 @@ public class NetworkTestUtils {
             }
         }
         assertTrue("Channel was not closed by timeout", closed);
-        assertEquals(channelState, selector.disconnected().get(node).state());
+        ChannelState finalState = selector.disconnected().get(node);
+        assertEquals(channelState, finalState.state());
+        return finalState;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 3b9e32b..c442672 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.network.CertStores;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.ChannelBuilders;
@@ -154,7 +155,8 @@ public class SaslAuthenticatorTest {
         jaasConfig.setClientOptions("PLAIN", TestJaasConfig.USERNAME, "invalidpassword");
 
         server = createEchoServer(securityProtocol);
-        createAndCheckClientConnectionFailure(securityProtocol, node);
+        createAndCheckClientAuthenticationFailure(securityProtocol, node, "PLAIN",
+                "Authentication failed: Invalid username or password");
         server.verifyAuthenticationMetrics(0, 1);
     }
 
@@ -169,7 +171,8 @@ public class SaslAuthenticatorTest {
         jaasConfig.setClientOptions("PLAIN", "invaliduser", TestJaasConfig.PASSWORD);
 
         server = createEchoServer(securityProtocol);
-        createAndCheckClientConnectionFailure(securityProtocol, node);
+        createAndCheckClientAuthenticationFailure(securityProtocol, node, "PLAIN",
+                "Authentication failed: Invalid username or password");
         server.verifyAuthenticationMetrics(0, 1);
     }
 
@@ -307,7 +310,7 @@ public class SaslAuthenticatorTest {
         String node = "0";
         server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
-        createAndCheckClientConnectionFailure(securityProtocol, node);
+        createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
         server.verifyAuthenticationMetrics(0, 1);
     }
 
@@ -326,7 +329,7 @@ public class SaslAuthenticatorTest {
         String node = "0";
         server = createEchoServer(securityProtocol);
         updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
-        createAndCheckClientConnectionFailure(securityProtocol, node);
+        createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
         server.verifyAuthenticationMetrics(0, 1);
     }
 
@@ -344,7 +347,7 @@ public class SaslAuthenticatorTest {
         server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove(TestJaasConfig.USERNAME);
         String node = "1";
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
-        createAndCheckClientConnectionFailure(securityProtocol, node);
+        createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
         server.verifyAuthenticationMetrics(0, 1);
 
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
@@ -1136,11 +1139,24 @@ public class SaslAuthenticatorTest {
         selector = null;
     }
 
-    private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node) throws Exception {
+    private void createAndCheckClientAuthenticationFailure(SecurityProtocol securityProtocol, String node,
+            String mechanism, String expectedErrorMessage) throws Exception {
+        ChannelState finalState = createAndCheckClientConnectionFailure(securityProtocol, node);
+        Exception exception = finalState.exception();
+        assertTrue("Invalid exception class " + exception.getClass(), exception instanceof SaslAuthenticationException);
+        if (expectedErrorMessage == null)
+            expectedErrorMessage = "Authentication failed due to invalid credentials with SASL mechanism " + mechanism;
+        assertEquals(expectedErrorMessage, exception.getMessage());
+    }
+
+    private ChannelState createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node)
+            throws Exception {
         createClientConnection(securityProtocol, node);
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
+        ChannelState finalState = NetworkTestUtils.waitForChannelClose(selector, node,
+                ChannelState.State.AUTHENTICATION_FAILED);
         selector.close();
         selector = null;
+        return finalState;
     }
 
     private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
index 5ca778b..4196db6 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
@@ -23,10 +23,9 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
-import javax.security.sasl.SaslException;
-
 import static org.junit.Assert.assertEquals;
 
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.authenticator.TestJaasConfig;
 
@@ -62,7 +61,7 @@ public class PlainSaslServerTest {
         assertEquals(0, nextChallenge.length);
     }
 
-    @Test(expected = SaslException.class)
+    @Test(expected = SaslAuthenticationException.class)
     public void authorizatonIdNotEqualsAuthenticationId() throws Exception {
         saslServer.evaluateResponse(saslMessage(USER_B, USER_A, PASSWORD_A));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
index 9b329d0..fd7f988 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
@@ -22,10 +22,9 @@ import org.junit.Test;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 
-import javax.security.sasl.SaslException;
-
 import static org.junit.Assert.assertTrue;
 
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 
 public class ScramSaslServerTest {
@@ -60,7 +59,7 @@ public class ScramSaslServerTest {
         assertTrue("Next challenge is empty", nextChallenge.length > 0);
     }
 
-    @Test(expected = SaslException.class)
+    @Test(expected = SaslAuthenticationException.class)
     public void authorizatonIdNotEqualsAuthenticationId() throws Exception {
         saslServer.evaluateResponse(clientFirstMessage(USER_A, USER_B));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9949e1ed/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 7162876..5872c7c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -70,6 +70,9 @@
     <li>SimpleAclAuthorizer now logs access denials to the authorizer log by default.</li>
     <li>Authentication failures are now reported to clients as one of the subclasses of <code>AuthenticationException</code>.
         No retries will be performed if a client connection fails authentication.</li>
+    <li>Custom <code>SaslServer</code> implementations may throw <code>SaslAuthenticationException</code> to provide an error
+        message to return to clients indicating the reason for authentication failure. Implementors should take care not to include
+        any security-critical information in the exception message that should not be leaked to unauthenticated clients.</li>
     <li>The <code>app-info</code> mbean registered with JMX to provide version and commit id will be deprecated and replaced with
         metrics providing these attributes.</li>
     <li>Kafka metrics may now contain non-numeric values. <code>org.apache.kafka.common.Metric#value()</code> has been deprecated and