You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2016/01/30 12:35:15 UTC
kafka git commit: KAFKA-3169;
Limit receive buffer size for SASL packets in broker
Repository: kafka
Updated Branches:
refs/heads/trunk afda7c4dc -> b433b4a24
KAFKA-3169; Limit receive buffer size for SASL packets in broker
Limit receive buffer size to avoid OOM in broker with invalid SASL packets
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Sriharsha Chintalapani <ha...@hortonworks.com>
Closes #831 from rajinisivaram/KAFKA-3169
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b433b4a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b433b4a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b433b4a2
Branch: refs/heads/trunk
Commit: b433b4a2470896d90d9dc596bb932030869d5d67
Parents: afda7c4
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Sat Jan 30 17:05:01 2016 +0530
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sat Jan 30 17:05:01 2016 +0530
----------------------------------------------------------------------
.../org/apache/kafka/common/network/SaslChannelBuilder.java | 2 +-
.../common/security/authenticator/SaslServerAuthenticator.java | 6 ++++--
2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b433b4a2/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 34a87c9..b3db4e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -81,7 +81,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
Authenticator authenticator;
if (mode == Mode.SERVER)
- authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer);
+ authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer, maxReceiveSize);
else
authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),
socketChannel.socket().getInetAddress().getHostName());
http://git-wip-us.apache.org/repos/asf/kafka/blob/b433b4a2/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index b4d99d2..1f925f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -60,6 +60,7 @@ public class SaslServerAuthenticator implements Authenticator {
private final Subject subject;
private final String node;
private final KerberosShortNamer kerberosNamer;
+ private final int maxReceiveSize;
// assigned in `configure`
private TransportLayer transportLayer;
@@ -68,7 +69,7 @@ public class SaslServerAuthenticator implements Authenticator {
private NetworkReceive netInBuffer;
private NetworkSend netOutBuffer;
- public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser) throws IOException {
+ public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser, int maxReceiveSize) throws IOException {
if (subject == null)
throw new IllegalArgumentException("subject cannot be null");
if (subject.getPrincipals().isEmpty())
@@ -76,6 +77,7 @@ public class SaslServerAuthenticator implements Authenticator {
this.node = node;
this.subject = subject;
this.kerberosNamer = kerberosNameParser;
+ this.maxReceiveSize = maxReceiveSize;
saslServer = createSaslServer();
}
@@ -149,7 +151,7 @@ public class SaslServerAuthenticator implements Authenticator {
return;
}
- if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
+ if (netInBuffer == null) netInBuffer = new NetworkReceive(maxReceiveSize, node);
netInBuffer.readFrom(transportLayer);