You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2016/01/30 12:35:15 UTC

kafka git commit: KAFKA-3169; Limit receive buffer size for SASL packets in broker

Repository: kafka
Updated Branches:
  refs/heads/trunk afda7c4dc -> b433b4a24


KAFKA-3169; Limit receive buffer size for SASL packets in broker

Limit receive buffer size to avoid OOM in broker with invalid SASL packets

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Sriharsha Chintalapani <ha...@hortonworks.com>

Closes #831 from rajinisivaram/KAFKA-3169


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b433b4a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b433b4a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b433b4a2

Branch: refs/heads/trunk
Commit: b433b4a2470896d90d9dc596bb932030869d5d67
Parents: afda7c4
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Sat Jan 30 17:05:01 2016 +0530
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sat Jan 30 17:05:01 2016 +0530

----------------------------------------------------------------------
 .../org/apache/kafka/common/network/SaslChannelBuilder.java    | 2 +-
 .../common/security/authenticator/SaslServerAuthenticator.java | 6 ++++--
 2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b433b4a2/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 34a87c9..b3db4e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -81,7 +81,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
             TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
             Authenticator authenticator;
             if (mode == Mode.SERVER)
-                authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer);
+                authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer, maxReceiveSize);
             else
                 authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),
                         socketChannel.socket().getInetAddress().getHostName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/b433b4a2/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index b4d99d2..1f925f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -60,6 +60,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private final Subject subject;
     private final String node;
     private final KerberosShortNamer kerberosNamer;
+    private final int maxReceiveSize;
 
     // assigned in `configure`
     private TransportLayer transportLayer;
@@ -68,7 +69,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private NetworkReceive netInBuffer;
     private NetworkSend netOutBuffer;
 
-    public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser) throws IOException {
+    public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser, int maxReceiveSize) throws IOException {
         if (subject == null)
             throw new IllegalArgumentException("subject cannot be null");
         if (subject.getPrincipals().isEmpty())
@@ -76,6 +77,7 @@ public class SaslServerAuthenticator implements Authenticator {
         this.node = node;
         this.subject = subject;
         this.kerberosNamer = kerberosNameParser;
+        this.maxReceiveSize = maxReceiveSize;
         saslServer = createSaslServer();
     }
 
@@ -149,7 +151,7 @@ public class SaslServerAuthenticator implements Authenticator {
             return;
         }
 
-        if (netInBuffer == null) netInBuffer = new NetworkReceive(node);
+        if (netInBuffer == null) netInBuffer = new NetworkReceive(maxReceiveSize, node);
 
         netInBuffer.readFrom(transportLayer);