You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2022/09/08 18:18:07 UTC
[kafka] 02/02: MINOR: Add configurable max receive size for SASL authentication requests
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 015d7aede6cbd350d56d75006930dd2bf89a4a5a
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Mon May 16 19:25:02 2022 +0530
MINOR: Add configurable max receive size for SASL authentication requests
This adds a new configuration `sasl.server.max.receive.size` that sets the maximum receive size for requests before and during authentication.
Reviewers: Tom Bentley <tb...@redhat.com>, Mickael Maison <mi...@gmail.com>
Co-authored-by: Manikumar Reddy <ma...@gmail.com>
Co-authored-by: Mickael Maison <mi...@gmail.com>
---
checkstyle/suppressions.xml | 2 +
.../config/internals/BrokerSecurityConfigs.java | 6 +++
.../authenticator/SaslServerAuthenticator.java | 16 ++++++--
.../kafka/common/security/TestSecurityConfig.java | 2 +
.../authenticator/SaslAuthenticatorTest.java | 46 ++++++++++++++++++++++
.../authenticator/SaslServerAuthenticatorTest.java | 6 +--
core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ++
.../scala/unit/kafka/server/KafkaConfigTest.scala | 2 +
8 files changed, 77 insertions(+), 7 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 55fcd1a9e5..7c32223961 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -49,6 +49,8 @@
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
<suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
+ <suppress checks="NPath"
+ files="SaslServerAuthenticator.java"/>
<suppress checks="ClassFanOutComplexity"
files="Errors.java"/>
<suppress checks="ClassFanOutComplexity"
diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index 0b90da8f80..8b7a9649c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -36,6 +36,8 @@ public class BrokerSecurityConfigs {
public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class";
public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules";
public static final String CONNECTIONS_MAX_REAUTH_MS = "connections.max.reauth.ms";
+ public static final int DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE = 524288;
+ public static final String SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG = "sasl.server.max.receive.size";
public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " +
"KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " +
@@ -89,4 +91,8 @@ public class BrokerSecurityConfigs {
+ "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently "
+ "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL "
+ "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000";
+
+ public static final String SASL_SERVER_MAX_RECEIVE_SIZE_DOC = "The maximum receive size allowed before and during initial SASL authentication." +
+ " Default receive size is 512KB. GSSAPI limits requests to 64K, but we allow upto 512KB by default for custom SASL mechanisms. In practice," +
+ " PLAIN, SCRAM and OAUTH mechanisms can use much smaller limits.";
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 019723b6b4..51b02952a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
+import org.apache.kafka.common.network.InvalidReceiveException;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.ByteBufferSend;
import org.apache.kafka.common.network.ChannelBuilders;
@@ -88,8 +89,6 @@ import java.util.Optional;
import java.util.function.Supplier;
public class SaslServerAuthenticator implements Authenticator {
- // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
- static final int MAX_RECEIVE_SIZE = 524288;
private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
/**
@@ -140,6 +139,7 @@ public class SaslServerAuthenticator implements Authenticator {
private String saslMechanism;
// buffers used in `authenticate`
+ private Integer saslAuthRequestMaxReceiveSize;
private NetworkReceive netInBuffer;
private Send netOutBuffer;
private Send authenticationFailureSend = null;
@@ -189,6 +189,10 @@ public class SaslServerAuthenticator implements Authenticator {
// Note that the old principal builder does not support SASL, so we do not need to pass the
// authenticator or the transport layer
this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, kerberosNameParser, null);
+
+ saslAuthRequestMaxReceiveSize = (Integer) configs.get(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG);
+ if (saslAuthRequestMaxReceiveSize == null)
+ saslAuthRequestMaxReceiveSize = BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE;
}
private void createSaslServer(String mechanism) throws IOException {
@@ -252,9 +256,13 @@ public class SaslServerAuthenticator implements Authenticator {
}
// allocate on heap (as opposed to any socket server memory pool)
- if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, connectionId);
+ if (netInBuffer == null) netInBuffer = new NetworkReceive(saslAuthRequestMaxReceiveSize, connectionId);
- netInBuffer.readFrom(transportLayer);
+ try {
+ netInBuffer.readFrom(transportLayer);
+ } catch (InvalidReceiveException e) {
+ throw new SaslAuthenticationException("Failing SASL authentication due to invalid receive size", e);
+ }
if (!netInBuffer.complete())
return;
netInBuffer.payload().rewind();
diff --git a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
index 07cbb7856d..197151f5fb 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
@@ -38,6 +38,8 @@ public class TestSecurityConfig extends AbstractConfig {
null, Importance.MEDIUM, BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
.define(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, Type.LONG, 0L, Importance.MEDIUM,
BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC)
+ .define(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, Type.INT, BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE,
+ Importance.LOW, BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC)
.withClientSslSupport()
.withClientSaslSupport();
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 988a0f2823..40a27935f3 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -212,6 +212,52 @@ public class SaslAuthenticatorTest {
checkAuthenticationAndReauthentication(securityProtocol, node);
}
+ /**
+ * Test SASL/PLAIN with sasl.authentication.max.receive.size config
+ */
+ @Test
+ public void testSaslAuthenticationMaxReceiveSize() throws Exception {
+ SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+ configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
+
+ // test auth with 1KB receive size
+ saslServerConfigs.put(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, "1024");
+ server = createEchoServer(securityProtocol);
+
+ // test valid sasl authentication
+ String node1 = "valid";
+ checkAuthenticationAndReauthentication(securityProtocol, node1);
+
+ // test with handshake request with large mechanism string
+ byte[] bytes = new byte[1024];
+ new Random().nextBytes(bytes);
+ String mechanism = new String(bytes, StandardCharsets.UTF_8);
+ String node2 = "invalid1";
+ createClientConnection(SecurityProtocol.PLAINTEXT, node2);
+ SaslHandshakeRequest handshakeRequest = buildSaslHandshakeRequest(mechanism, ApiKeys.SASL_HANDSHAKE.latestVersion());
+ RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version(), "someclient", nextCorrelationId++);
+ NetworkSend send = new NetworkSend(node2, handshakeRequest.toSend(header));
+ selector.send(send);
+ //we will get exception in server and connection gets closed.
+ NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state());
+ selector.close();
+
+ String node3 = "invalid2";
+ createClientConnection(SecurityProtocol.PLAINTEXT, node3);
+ sendHandshakeRequestReceiveResponse(node3, ApiKeys.SASL_HANDSHAKE.latestVersion());
+
+ // test with sasl authenticate request with large auth_byes string
+ String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" + new String(bytes, StandardCharsets.UTF_8);
+ ByteBuffer authBuf = ByteBuffer.wrap(Utils.utf8(authString));
+ SaslAuthenticateRequestData data = new SaslAuthenticateRequestData().setAuthBytes(authBuf.array());
+ SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(data).build();
+ header = new RequestHeader(ApiKeys.SASL_AUTHENTICATE, request.version(), "someclient", nextCorrelationId++);
+ send = new NetworkSend(node3, request.toSend(header));
+ selector.send(send);
+ NetworkTestUtils.waitForChannelClose(selector, node3, ChannelState.READY.state());
+ server.verifyAuthenticationMetrics(1, 2);
+ }
+
/**
* Tests that SASL/PLAIN clients with invalid password fail authentication.
*/
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 50696ecf05..b0dec3e522 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.security.authenticator;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
@@ -25,7 +26,6 @@ import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelMetadataRegistry;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.DefaultChannelMetadataRegistry;
-import org.apache.kafka.common.network.InvalidReceiveException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -96,10 +96,10 @@ public class SaslServerAuthenticatorTest {
SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry());
when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
- invocation.<ByteBuffer>getArgument(0).putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1);
+ invocation.<ByteBuffer>getArgument(0).putInt(BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE + 1);
return 4;
});
- assertThrows(InvalidReceiveException.class, authenticator::authenticate);
+ assertThrows(SaslAuthenticationException.class, authenticator::authenticate);
verify(transportLayer).read(any(ByteBuffer.class));
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 860056f9a3..8f4806185b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -258,6 +258,7 @@ object Defaults {
/** ********* General Security configuration ***********/
val ConnectionsMaxReauthMsDefault = 0L
+ val DefaultServerMaxMaxReceiveSize = BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE
val DefaultPrincipalSerde = classOf[DefaultKafkaPrincipalBuilder]
/** ********* Sasl configuration ***********/
@@ -565,6 +566,7 @@ object KafkaConfig {
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
val ConnectionsMaxReauthMsProp = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS
+ val SaslServerMaxReceiveSizeProp = BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG
val securityProviderClassProp = SecurityConfig.SECURITY_PROVIDERS_CONFIG
/** ********* SSL Configuration ****************/
@@ -1009,6 +1011,7 @@ object KafkaConfig {
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
val ConnectionsMaxReauthMsDoc = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC
+ val SaslServerMaxReceiveSizeDoc = BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC
val securityProviderClassDoc = SecurityConfig.SECURITY_PROVIDERS_DOC
/** ********* SSL Configuration ****************/
@@ -1315,6 +1318,7 @@ object KafkaConfig {
/** ********* General Security Configuration ****************/
.define(ConnectionsMaxReauthMsProp, LONG, Defaults.ConnectionsMaxReauthMsDefault, MEDIUM, ConnectionsMaxReauthMsDoc)
+ .define(SaslServerMaxReceiveSizeProp, INT, Defaults.DefaultServerMaxMaxReceiveSize, MEDIUM, SaslServerMaxReceiveSizeDoc)
.define(securityProviderClassProp, STRING, null, LOW, securityProviderClassDoc)
/** ********* SSL Configuration ****************/
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index ee638ba893..78adff5f8f 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -946,6 +946,8 @@ class KafkaConfigTest {
case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
+ case KafkaConfig.SaslServerMaxReceiveSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
+
// Raft Quorum Configs
case RaftConfig.QUORUM_VOTERS_CONFIG => // ignore string
case RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")