You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/09/02 16:26:36 UTC

[kafka] 02/04: MINOR: Add configurable max receive size for SASL authentication requests

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 50cb087e03e878555144b1c41d1d0b84f640ac90
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Mon May 16 19:25:02 2022 +0530

    MINOR: Add configurable max receive size for SASL authentication requests
    
    This adds a new configuration `sasl.server.max.receive.size` that sets the maximum receive size for requests before and during authentication.
    
    Reviewers: Tom Bentley <tb...@redhat.com>, Mickael Maison <mi...@gmail.com>
    
    Co-authored-by: Manikumar Reddy <ma...@gmail.com>
    Co-authored-by: Mickael Maison <mi...@gmail.com>
---
 checkstyle/suppressions.xml                        |  2 +
 .../config/internals/BrokerSecurityConfigs.java    |  6 +++
 .../authenticator/SaslServerAuthenticator.java     | 16 ++++++--
 .../kafka/common/security/TestSecurityConfig.java  |  2 +
 .../authenticator/SaslAuthenticatorTest.java       | 46 ++++++++++++++++++++++
 .../authenticator/SaslServerAuthenticatorTest.java |  6 +--
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 ++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  2 +
 8 files changed, 77 insertions(+), 7 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 43ccfebad9..2e5e00813d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -32,6 +32,8 @@
               files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
+    <suppress checks="NPath"
+              files="SaslServerAuthenticator.java"/>
     <suppress checks="ClassFanOutComplexity"
               files="Errors.java"/>
     <suppress checks="ClassFanOutComplexity"
diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index 0b90da8f80..8b7a9649c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -36,6 +36,8 @@ public class BrokerSecurityConfigs {
     public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class";
     public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules";
     public static final String CONNECTIONS_MAX_REAUTH_MS = "connections.max.reauth.ms";
+    public static final int DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE = 524288;
+    public static final String SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG = "sasl.server.max.receive.size";
 
     public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " +
             "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " +
@@ -89,4 +91,8 @@ public class BrokerSecurityConfigs {
             + "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently "
             + "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL "
             + "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000";
+
+    public static final String SASL_SERVER_MAX_RECEIVE_SIZE_DOC = "The maximum receive size allowed before and during initial SASL authentication." +
+            " Default receive size is 512KB. GSSAPI limits requests to 64K, but we allow upto 512KB by default for custom SASL mechanisms. In practice," +
+            " PLAIN, SCRAM and OAUTH mechanisms can use much smaller limits.";
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 6e35ee7a90..a48d7472dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
+import org.apache.kafka.common.network.InvalidReceiveException;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.ByteBufferSend;
 import org.apache.kafka.common.network.ChannelBuilders;
@@ -88,8 +89,6 @@ import java.util.Optional;
 import java.util.function.Supplier;
 
 public class SaslServerAuthenticator implements Authenticator {
-    // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
-    static final int MAX_RECEIVE_SIZE = 524288;
     private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
 
     /**
@@ -140,6 +139,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private String saslMechanism;
 
     // buffers used in `authenticate`
+    private Integer saslAuthRequestMaxReceiveSize;
     private NetworkReceive netInBuffer;
     private Send netOutBuffer;
     private Send authenticationFailureSend = null;
@@ -189,6 +189,10 @@ public class SaslServerAuthenticator implements Authenticator {
         // Note that the old principal builder does not support SASL, so we do not need to pass the
         // authenticator or the transport layer
         this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, kerberosNameParser, null);
+
+        saslAuthRequestMaxReceiveSize = (Integer) configs.get(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG);
+        if (saslAuthRequestMaxReceiveSize == null)
+            saslAuthRequestMaxReceiveSize = BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE;
     }
 
     private void createSaslServer(String mechanism) throws IOException {
@@ -252,9 +256,13 @@ public class SaslServerAuthenticator implements Authenticator {
             }
 
             // allocate on heap (as opposed to any socket server memory pool)
-            if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, connectionId);
+            if (netInBuffer == null) netInBuffer = new NetworkReceive(saslAuthRequestMaxReceiveSize, connectionId);
 
-            netInBuffer.readFrom(transportLayer);
+            try {
+                netInBuffer.readFrom(transportLayer);
+            } catch (InvalidReceiveException e) {
+                throw new SaslAuthenticationException("Failing SASL authentication due to invalid receive size", e);
+            }
             if (!netInBuffer.complete())
                 return;
             netInBuffer.payload().rewind();
diff --git a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
index 07cbb7856d..197151f5fb 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
@@ -38,6 +38,8 @@ public class TestSecurityConfig extends AbstractConfig {
                     null, Importance.MEDIUM, BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
             .define(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, Type.LONG, 0L, Importance.MEDIUM,
                     BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC)
+            .define(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, Type.INT, BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE,
+                    Importance.LOW, BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC)
             .withClientSslSupport()
             .withClientSaslSupport();
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 988a0f2823..40a27935f3 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -212,6 +212,52 @@ public class SaslAuthenticatorTest {
         checkAuthenticationAndReauthentication(securityProtocol, node);
     }
 
+    /**
+     * Test SASL/PLAIN with sasl.authentication.max.receive.size config
+     */
+    @Test
+    public void testSaslAuthenticationMaxReceiveSize() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
+
+        // test auth with 1KB receive size
+        saslServerConfigs.put(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, "1024");
+        server = createEchoServer(securityProtocol);
+
+        // test valid sasl authentication
+        String node1 = "valid";
+        checkAuthenticationAndReauthentication(securityProtocol, node1);
+
+        // test with handshake request with large mechanism string
+        byte[] bytes = new byte[1024];
+        new Random().nextBytes(bytes);
+        String mechanism = new String(bytes, StandardCharsets.UTF_8);
+        String node2 = "invalid1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node2);
+        SaslHandshakeRequest handshakeRequest = buildSaslHandshakeRequest(mechanism, ApiKeys.SASL_HANDSHAKE.latestVersion());
+        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version(), "someclient", nextCorrelationId++);
+        NetworkSend send = new NetworkSend(node2, handshakeRequest.toSend(header));
+        selector.send(send);
+        //we will get exception in server and connection gets closed.
+        NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state());
+        selector.close();
+
+        String node3 = "invalid2";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node3);
+        sendHandshakeRequestReceiveResponse(node3, ApiKeys.SASL_HANDSHAKE.latestVersion());
+
+        // test with sasl authenticate request with large auth_byes string
+        String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" +  new String(bytes, StandardCharsets.UTF_8);
+        ByteBuffer authBuf = ByteBuffer.wrap(Utils.utf8(authString));
+        SaslAuthenticateRequestData data = new SaslAuthenticateRequestData().setAuthBytes(authBuf.array());
+        SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(data).build();
+        header = new RequestHeader(ApiKeys.SASL_AUTHENTICATE, request.version(), "someclient", nextCorrelationId++);
+        send = new NetworkSend(node3, request.toSend(header));
+        selector.send(send);
+        NetworkTestUtils.waitForChannelClose(selector, node3, ChannelState.READY.state());
+        server.verifyAuthenticationMetrics(1, 2);
+    }
+
     /**
      * Tests that SASL/PLAIN clients with invalid password fail authentication.
      */
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index af0fedd4f5..8245f57516 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -19,11 +19,11 @@ package org.apache.kafka.common.security.authenticator;
 import java.net.InetAddress;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.network.ChannelMetadataRegistry;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.DefaultChannelMetadataRegistry;
-import org.apache.kafka.common.network.InvalidReceiveException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -68,10 +68,10 @@ public class SaslServerAuthenticatorTest {
             SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry());
 
         when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
-            invocation.<ByteBuffer>getArgument(0).putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1);
+            invocation.<ByteBuffer>getArgument(0).putInt(BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE + 1);
             return 4;
         });
-        assertThrows(InvalidReceiveException.class, authenticator::authenticate);
+        assertThrows(SaslAuthenticationException.class, authenticator::authenticate);
         verify(transportLayer).read(any(ByteBuffer.class));
     }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 15c5d09502..09b2bc5dea 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -255,6 +255,7 @@ object Defaults {
 
     /** ********* General Security configuration ***********/
   val ConnectionsMaxReauthMsDefault = 0L
+  val DefaultServerMaxMaxReceiveSize = BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE
   val DefaultPrincipalSerde = classOf[DefaultKafkaPrincipalBuilder]
 
   /** ********* Sasl configuration ***********/
@@ -559,6 +560,7 @@ object KafkaConfig {
   /** ******** Common Security Configuration *************/
   val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
   val ConnectionsMaxReauthMsProp = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS
+  val SaslServerMaxReceiveSizeProp = BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG
   val securityProviderClassProp = SecurityConfig.SECURITY_PROVIDERS_CONFIG
 
   /** ********* SSL Configuration ****************/
@@ -987,6 +989,7 @@ object KafkaConfig {
   /** ******** Common Security Configuration *************/
   val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
   val ConnectionsMaxReauthMsDoc = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC
+  val SaslServerMaxReceiveSizeDoc = BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC
   val securityProviderClassDoc = SecurityConfig.SECURITY_PROVIDERS_DOC
 
   /** ********* SSL Configuration ****************/
@@ -1291,6 +1294,7 @@ object KafkaConfig {
 
       /** ********* General Security Configuration ****************/
       .define(ConnectionsMaxReauthMsProp, LONG, Defaults.ConnectionsMaxReauthMsDefault, MEDIUM, ConnectionsMaxReauthMsDoc)
+      .define(SaslServerMaxReceiveSizeProp, INT, Defaults.DefaultServerMaxMaxReceiveSize, MEDIUM, SaslServerMaxReceiveSizeDoc)
       .define(securityProviderClassProp, STRING, null, LOW, securityProviderClassDoc)
 
       /** ********* SSL Configuration ****************/
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2ce336e4ca..946effe782 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -944,6 +944,8 @@ class KafkaConfigTest {
         case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
         case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
 
+        case KafkaConfig.SaslServerMaxReceiveSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
+
         // Raft Quorum Configs
         case RaftConfig.QUORUM_VOTERS_CONFIG => // ignore string
         case RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")