You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/07/26 06:20:07 UTC

[1/2] kafka git commit: KAFKA-4602; KIP-72 - Allow putting a bound on memory consumed by Incoming requests

Repository: kafka
Updated Branches:
  refs/heads/trunk f15cdbc91 -> 47ee8e954


http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index cc061da..cc6a394 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -22,13 +22,22 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.nio.ByteBuffer;
 
+import java.util.Random;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.memory.SimpleMemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.utils.MockTime;
@@ -50,7 +59,7 @@ public class SelectorTest {
     protected Time time;
     protected Selector selector;
     protected ChannelBuilder channelBuilder;
-    private Metrics metrics;
+    protected Metrics metrics;
 
     @Before
     public void setUp() throws Exception {
@@ -322,6 +331,87 @@ public class SelectorTest {
         assertTrue("Unexpected receive", selector.completedReceives().isEmpty());
     }
 
+    @Test
+    public void testMuteOnOOM() throws Exception {
+        //clean up default selector, replace it with one that uses a finite mem pool
+        selector.close();
+        MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
+        selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
+            new HashMap<String, String>(), true, false, channelBuilder, pool);
+
+        try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+            ss.bind(new InetSocketAddress(0));
+
+            InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
+
+            Thread sender1 = createSender(serverAddress, randomPayload(900));
+            Thread sender2 = createSender(serverAddress, randomPayload(900));
+            sender1.start();
+            sender2.start();
+
+            //wait until everything has been flushed out to network (assuming payload size is smaller than OS buffer size)
+            //this is important because we assume both requests' prefixes (1st 4 bytes) have made it.
+            sender1.join(5000);
+            sender2.join(5000);
+
+            SocketChannel channelX = ss.accept(); //not defined if its 1 or 2
+            channelX.configureBlocking(false);
+            SocketChannel channelY = ss.accept();
+            channelY.configureBlocking(false);
+            selector.register("clientX", channelX);
+            selector.register("clientY", channelY);
+
+            List<NetworkReceive> completed = Collections.emptyList();
+            long deadline = System.currentTimeMillis() + 5000;
+            while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
+                selector.poll(1000);
+                completed = selector.completedReceives();
+            }
+            assertEquals("could not read a single request within timeout", 1, completed.size());
+            NetworkReceive firstReceive = completed.get(0);
+            assertEquals(0, pool.availableMemory());
+            assertTrue(selector.isOutOfMemory());
+
+            selector.poll(10);
+            assertTrue(selector.completedReceives().isEmpty());
+            assertEquals(0, pool.availableMemory());
+            assertTrue(selector.isOutOfMemory());
+
+            firstReceive.close();
+            assertEquals(900, pool.availableMemory()); //memory has been released back to pool
+
+            completed = Collections.emptyList();
+            deadline = System.currentTimeMillis() + 5000;
+            while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
+                selector.poll(1000);
+                completed = selector.completedReceives();
+            }
+            assertEquals("could not read a single request within timeout", 1, selector.completedReceives().size());
+            assertEquals(0, pool.availableMemory());
+            assertFalse(selector.isOutOfMemory());
+        }
+    }
+
+    private Thread createSender(InetSocketAddress serverAddress, byte[] payload) {
+        return new PlaintextSender(serverAddress, payload);
+    }
+
+    protected byte[] randomPayload(int sizeBytes) throws Exception {
+        Random random = new Random();
+        byte[] payload = new byte[sizeBytes + 4];
+        random.nextBytes(payload);
+        ByteArrayOutputStream prefixOs = new ByteArrayOutputStream();
+        DataOutputStream prefixDos = new DataOutputStream(prefixOs);
+        prefixDos.writeInt(sizeBytes);
+        prefixDos.flush();
+        prefixDos.close();
+        prefixOs.flush();
+        prefixOs.close();
+        byte[] prefix = prefixOs.toByteArray();
+        System.arraycopy(prefix, 0, payload, 0, prefix.length);
+        return payload;
+    }
+
     private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));
         selector.poll(1000L);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index e272855..46d3b79 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -17,17 +17,23 @@
 package org.apache.kafka.common.network;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.memory.SimpleMemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.protocol.SecurityProtocol;
@@ -35,6 +41,7 @@ import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestSslUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -43,7 +50,6 @@ import org.junit.Test;
  */
 public class SslSelectorTest extends SelectorTest {
 
-    private Metrics metrics;
     private Map<String, Object> sslClientConfigs;
 
     @Before
@@ -160,6 +166,90 @@ public class SslSelectorTest extends SelectorTest {
 
     }
 
+    @Override
+    public void testMuteOnOOM() throws Exception {
+        //clean up default selector, replace it with one that uses a finite mem pool
+        selector.close();
+        MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
+        //the initial channel builder is for clients, we need a server one
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
+        sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+        channelBuilder = new SslChannelBuilder(Mode.SERVER);
+        channelBuilder.configure(sslServerConfigs);
+        selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup", 
+                new HashMap<String, String>(), true, false, channelBuilder, pool);
+
+        try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+            ss.bind(new InetSocketAddress(0));
+
+            InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
+
+            SslSender sender1 = createSender(serverAddress, randomPayload(900));
+            SslSender sender2 = createSender(serverAddress, randomPayload(900));
+            sender1.start();
+            sender2.start();
+
+            SocketChannel channelX = ss.accept(); //not defined if its 1 or 2
+            channelX.configureBlocking(false);
+            SocketChannel channelY = ss.accept();
+            channelY.configureBlocking(false);
+            selector.register("clientX", channelX);
+            selector.register("clientY", channelY);
+
+            boolean success = false;
+            NetworkReceive firstReceive = null;
+            long deadline = System.currentTimeMillis() + 5000;
+            //keep calling poll until:
+            //1. both senders have completed the handshakes (so server selector has tried reading both payloads)
+            //2. a single payload is actually read out completely (the other is too big to fit)
+            while (System.currentTimeMillis() < deadline) {
+                selector.poll(10);
+
+                List<NetworkReceive> completed = selector.completedReceives();
+                if (firstReceive == null) {
+                    if (!completed.isEmpty()) {
+                        assertEquals("expecting a single request", 1, completed.size());
+                        firstReceive = completed.get(0);
+                        assertTrue(selector.isMadeReadProgressLastPoll());
+                        assertEquals(0, pool.availableMemory());
+                    }
+                } else {
+                    assertTrue("only expecting single request", completed.isEmpty());
+                }
+
+                boolean handshaked = sender1.waitForHandshake(1);
+                handshaked = handshaked && sender2.waitForHandshake(1);
+
+                if (handshaked && firstReceive != null) {
+                    success = true;
+                    break;
+                }
+            }
+            if (!success) {
+                Assert.fail("could not initiate connections within timeout");
+            }
+
+            selector.poll(10);
+            assertTrue(selector.completedReceives().isEmpty());
+            assertEquals(0, pool.availableMemory());
+            assertTrue(selector.isOutOfMemory());
+
+            firstReceive.close();
+            assertEquals(900, pool.availableMemory()); //memory has been released back to pool
+
+            List<NetworkReceive> completed = Collections.emptyList();
+            deadline = System.currentTimeMillis() + 5000;
+            while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
+                selector.poll(1000);
+                completed = selector.completedReceives();
+            }
+            assertEquals("could not read remaining request within timeout", 1, completed.size());
+            assertEquals(0, pool.availableMemory());
+            assertFalse(selector.isOutOfMemory());
+        }
+    }
+
     /**
      * Connects and waits for handshake to complete. This is required since SslTransportLayer
      * implementation requires the channel to be ready before send is invoked (unlike plaintext
@@ -169,4 +259,7 @@ public class SslSelectorTest extends SelectorTest {
         blockingConnect(node, serverAddr);
     }
 
+    private SslSender createSender(InetSocketAddress serverAddress, byte[] payload) {
+        return new SslSender(serverAddress, payload);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
new file mode 100644
index 0000000..cae69cb
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class SslSender extends Thread {
+
+    private final InetSocketAddress serverAddress;
+    private final byte[] payload;
+    private final CountDownLatch handshaked = new CountDownLatch(1);
+
+    public SslSender(InetSocketAddress serverAddress, byte[] payload) {
+        this.serverAddress = serverAddress;
+        this.payload = payload;
+        setDaemon(true);
+        setName("SslSender - " + payload.length + " bytes @ " + serverAddress);
+    }
+
+    @Override
+    public void run() {
+        try {
+            SSLContext sc = SSLContext.getInstance("TLSv1.2");
+            sc.init(null, new TrustManager[]{new NaiveTrustManager()}, new java.security.SecureRandom());
+            try (SSLSocket connection = (SSLSocket) sc.getSocketFactory().createSocket(serverAddress.getAddress(), serverAddress.getPort())) {
+                OutputStream os = connection.getOutputStream();
+                connection.startHandshake();
+                handshaked.countDown();
+                os.write(payload);
+                os.flush();
+            }
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+        }
+    }
+
+    public boolean waitForHandshake(long timeoutMillis) throws InterruptedException {
+        return handshaked.await(timeoutMillis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * blindly trust any certificate presented to it
+     */
+    private static class NaiveTrustManager implements X509TrustManager {
+        @Override
+        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+            //nop
+        }
+
+        @Override
+        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+            //nop
+        }
+
+        @Override
+        public X509Certificate[] getAcceptedIssuers() {
+            return new X509Certificate[0];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index bb5d2a7..8338ad7 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -39,6 +39,7 @@ import javax.net.ssl.SSLParameters;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.metrics.Metrics;
@@ -580,7 +581,7 @@ public class SslTransportLayerTest {
     public void testNetworkThreadTimeRecorded() throws Exception {
         selector.close();
         this.selector = new Selector(NetworkReceive.UNLIMITED, 5000, new Metrics(), Time.SYSTEM,
-                "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder);
+                "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder, MemoryPool.NONE);
 
         String node = "0";
         server = createEchoServer(SecurityProtocol.SSL);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
new file mode 100644
index 0000000..2b2cc91
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ProtoUtilsTest {
+    @Test
+    public void testDelayedAllocationSchemaDetection() throws Exception {
+        //verifies that schemas known to retain a reference to the underlying byte buffer are correctly detected.
+        for (ApiKeys key : ApiKeys.values()) {
+            if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP) {
+                Assert.assertTrue(Protocol.requiresDelayedDeallocation(key.id));
+            } else {
+                Assert.assertFalse(Protocol.requiresDelayedDeallocation(key.id));
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 2d6d05c..3feeff2 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -36,6 +36,7 @@ import java.util.Collections;
 import java.util.Random;
 
 import static org.apache.kafka.common.utils.Utils.formatAddress;
+import static org.apache.kafka.common.utils.Utils.formatBytes;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 import static org.junit.Assert.assertArrayEquals;
@@ -78,6 +79,17 @@ public class UtilsTest {
     }
 
     @Test
+    public void testFormatBytes() {
+        assertEquals("-1", formatBytes(-1));
+        assertEquals("1023 B", formatBytes(1023));
+        assertEquals("1 KB", formatBytes(1024));
+        assertEquals("1024 KB", formatBytes((1024 * 1024) - 1));
+        assertEquals("1 MB", formatBytes(1024 * 1024));
+        assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024)));
+        assertEquals("10 MB", formatBytes(10 * 1024 * 1024));
+    }
+
+    @Test
     public void testJoin() {
         assertEquals("", Utils.join(Collections.emptyList(), ","));
         assertEquals("1", Utils.join(Arrays.asList("1"), ","));

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index bd71340..6b8dbaa 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -29,9 +29,10 @@ import kafka.server.QuotaId
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
-import org.apache.kafka.common.record.{RecordBatch, MemoryRecords}
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
@@ -41,7 +42,7 @@ import scala.reflect.ClassTag
 
 object RequestChannel extends Logging {
   val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
-    buffer = shutdownReceive, startTimeNanos = 0, listenerName = new ListenerName(""),
+    buffer = shutdownReceive, memoryPool = MemoryPool.NONE, startTimeNanos = 0, listenerName = new ListenerName(""),
     securityProtocol = SecurityProtocol.PLAINTEXT)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -56,10 +57,12 @@ object RequestChannel extends Logging {
     val sanitizedUser = QuotaId.sanitize(principal.getName)
   }
 
-  case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer,
-                     startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
+  case class Request(processor: Int, connectionId: String, session: Session, buffer: ByteBuffer,
+                     private val memoryPool: MemoryPool, startTimeNanos: Long, listenerName: ListenerName, 
+                     securityProtocol: SecurityProtocol) {
     // These need to be volatile because the readers are in the network thread and the writers are in the request
     // handler threads or the purgatory threads
+    @volatile var bufferReference = buffer
     @volatile var requestDequeueTimeNanos = -1L
     @volatile var apiLocalCompleteTimeNanos = -1L
     @volatile var responseCompleteTimeNanos = -1L
@@ -104,7 +107,12 @@ object RequestChannel extends Logging {
       else
         null
 
-    buffer = null
+    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
+    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
+    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    if (!Protocol.requiresDelayedDeallocation(requestId)) {
+      dispose()
+    }
 
     def requestDesc(details: Boolean): String = {
       if (requestObj != null)
@@ -194,6 +202,13 @@ object RequestChannel extends Logging {
           .format(requestDesc(detailsEnabled), connectionId, totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs, responseSendTimeMs, securityProtocol, session.principal, listenerName.value))
       }
     }
+
+    def dispose(): Unit = {
+      if (bufferReference != null) {
+        memoryPool.release(bufferReference)
+        bufferReference = null
+      }
+    }
   }
 
   object Response {

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 2ba5553..e541015 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -32,7 +32,9 @@ import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.metrics.stats.Rate
 import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.protocol.SecurityProtocol
@@ -61,6 +63,10 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
   this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
 
+  private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
+  private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", "socket-server-metrics")
+  memoryPoolSensor.add(memoryPoolDepletedPercentMetricName, new Rate(TimeUnit.MILLISECONDS))
+  private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
   val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
   private val processors = new Array[Processor](totalProcessorThreads)
 
@@ -86,7 +92,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
         val processorEndIndex = processorBeginIndex + numProcessorThreads
 
         for (i <- processorBeginIndex until processorEndIndex)
-          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
+          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
 
         val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
           processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
@@ -109,7 +115,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
         }.sum / totalProcessorThreads
       }
     )
-
+    newGauge("MemoryPoolAvailable",
+      new Gauge[Long] {
+        def value = memoryPool.availableMemory()
+      }
+    )
+    newGauge("MemoryPoolUsed",
+      new Gauge[Long] {
+        def value = memoryPool.size() - memoryPool.availableMemory()
+      }
+    )
     info("Started " + acceptors.size + " acceptor threads")
   }
 
@@ -138,7 +153,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
   /* `protected` for test usage */
   protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
-                                      securityProtocol: SecurityProtocol): Processor = {
+                                      securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
     new Processor(id,
       time,
       config.socketRequestMaxBytes,
@@ -149,7 +164,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
       securityProtocol,
       config,
       metrics,
-      credentialProvider
+      credentialProvider,
+      memoryPool
     )
   }
 
@@ -378,7 +394,8 @@ private[kafka] class Processor(val id: Int,
                                securityProtocol: SecurityProtocol,
                                config: KafkaConfig,
                                metrics: Metrics,
-                               credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+                               credentialProvider: CredentialProvider,
+                               memoryPool: MemoryPool) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -422,7 +439,8 @@ private[kafka] class Processor(val id: Int,
     metricTags,
     false,
     true,
-    ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
+    ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache),
+    memoryPool)
 
   override def run() {
     startupComplete()
@@ -517,7 +535,8 @@ private[kafka] class Processor(val id: Int,
 
         val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
           buffer = receive.payload, startTimeNanos = time.nanoseconds,
-          listenerName = listenerName, securityProtocol = securityProtocol)
+          listenerName = listenerName, securityProtocol = securityProtocol,
+          memoryPool = memoryPool)
         requestChannel.sendRequest(req)
         selector.mute(receive.source)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3941e17..a900e6d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -52,6 +52,7 @@ object Defaults {
   val NumIoThreads = 8
   val BackgroundThreads = 10
   val QueuedMaxRequests = 500
+  val QueuedMaxRequestBytes = -1
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassName = ""
@@ -236,6 +237,7 @@ object KafkaConfig {
   val NumIoThreadsProp = "num.io.threads"
   val BackgroundThreadsProp = "background.threads"
   val QueuedMaxRequestsProp = "queued.max.requests"
+  val QueuedMaxBytesProp = "queued.max.request.bytes"
   val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameProp = "authorizer.class.name"
@@ -420,6 +422,7 @@ object KafkaConfig {
   val NumIoThreadsDoc = "The number of threads that the server uses for processing requests, which may include disk I/O"
   val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks"
   val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads"
+  val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests are read"
   val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = "The authorizer class that should be used for authorization"
@@ -684,6 +687,7 @@ object KafkaConfig {
       .define(NumIoThreadsProp, INT, Defaults.NumIoThreads, atLeast(1), HIGH, NumIoThreadsDoc)
       .define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc)
       .define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc)
+      .define(QueuedMaxBytesProp, LONG, Defaults.QueuedMaxRequestBytes, MEDIUM, QueuedMaxRequestBytesDoc)
       .define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc)
 
       /************* Authorizer Configuration ***********/
@@ -900,6 +904,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
   val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
   val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
+  val queuedMaxBytes = getLong(KafkaConfig.QueuedMaxBytesProp)
   val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
   val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
   val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp)
@@ -1191,5 +1196,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
       s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
     require(!interBrokerUsesSasl || saslEnabledMechanisms.contains(saslMechanismInterBrokerProtocol),
       s"${KafkaConfig.SaslMechanismInterBrokerProtocolProp} must be included in ${KafkaConfig.SaslEnabledMechanismsProp} when SASL is used for inter-broker communication")
+    require(queuedMaxBytes <= 0 || queuedMaxBytes >= socketRequestMaxBytes,
+      s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index feb07b8..512be67 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -40,9 +40,9 @@ class KafkaRequestHandler(id: Int,
   private val latch = new CountDownLatch(1)
 
   def run() {
-    while (true) {
+    while(true) {
+      var req : RequestChannel.Request = null
       try {
-        var req : RequestChannel.Request = null
         while (req == null) {
           // We use a single meter for aggregate idle percentage for the thread pool.
           // Since meter is calculated as total_recorded_value / time_window and
@@ -69,6 +69,9 @@ class KafkaRequestHandler(id: Int,
           latch.countDown()
           Exit.exit(e.statusCode)
         case e: Throwable => error("Exception when handling request", e)
+      } finally {
+        if (req != null)
+          req.dispose()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index acf96e8..ed35269 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -29,6 +29,7 @@ import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
@@ -328,9 +329,9 @@ class SocketServerTest extends JUnitSuite {
     var conn: Socket = null
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
       override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
-                                protocol: SecurityProtocol): Processor = {
+                                protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
-          config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider) {
+          config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, MemoryPool.NONE) {
           override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
             conn.close()
             super.sendResponse(response, responseSend)

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index ae1cfc0..38d4bb3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -35,6 +35,7 @@ import kafka.server._
 import kafka.utils.{MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnsupportedVersionException
+import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
@@ -395,7 +396,7 @@ class KafkaApisTest {
     val header = new RequestHeader(builder.apiKey.id, request.version, "", 0)
     val buffer = request.serialize(header)
     val session = Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost)
-    (request, RequestChannel.Request(1, "1", session, buffer, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT))
+    (request, RequestChannel.Request(1, "1", session, buffer, MemoryPool.NONE, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT))
   }
 
   private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1b801de..dee6e87 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -545,6 +545,7 @@ class KafkaConfigTest {
         case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
         case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.QueuedMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string


[2/2] kafka git commit: KAFKA-4602; KIP-72 - Allow putting a bound on memory consumed by Incoming requests

Posted by ju...@apache.org.
KAFKA-4602; KIP-72 - Allow putting a bound on memory consumed by Incoming requests

this is the initial implementation.

Author: radai-rosenblatt <ra...@gmail.com>

Reviewers: Ewen Cheslack-Postava <me...@ewencp.org>, Ismael Juma <is...@juma.me.uk>, Rajini Sivaram <ra...@googlemail.com>, Jun Rao <ju...@gmail.com>

Closes #2330 from radai-rosenblatt/broker-memory-pool-with-muting


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/47ee8e95
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/47ee8e95
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/47ee8e95

Branch: refs/heads/trunk
Commit: 47ee8e954df62b9a79099e944ec4be29afe046f6
Parents: f15cdbc
Author: radai-rosenblatt <ra...@gmail.com>
Authored: Wed Jul 26 08:19:56 2017 +0200
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jul 26 08:19:56 2017 +0200

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   5 +
 .../memory/GarbageCollectedMemoryPool.java      | 168 +++++++++++++++++++
 .../apache/kafka/common/memory/MemoryPool.java  |  95 +++++++++++
 .../kafka/common/memory/SimpleMemoryPool.java   | 137 +++++++++++++++
 .../kafka/common/network/ChannelBuilder.java    |   7 +-
 .../kafka/common/network/KafkaChannel.java      |  55 +++++-
 .../kafka/common/network/NetworkReceive.java    |  50 +++++-
 .../common/network/PlaintextChannelBuilder.java |   6 +-
 .../common/network/PlaintextTransportLayer.java |   5 +
 .../apache/kafka/common/network/Receive.java    |  12 +-
 .../common/network/SaslChannelBuilder.java      |   6 +-
 .../apache/kafka/common/network/Selector.java   | 131 +++++++++++++--
 .../kafka/common/network/SslChannelBuilder.java |   6 +-
 .../kafka/common/network/SslTransportLayer.java |   5 +
 .../kafka/common/network/TransportLayer.java    |   6 +-
 .../kafka/common/protocol/ProtoUtils.java       |  47 ++++++
 .../apache/kafka/common/protocol/Protocol.java  |  47 ++++++
 .../kafka/common/protocol/SchemaVisitor.java    |  27 +++
 .../common/protocol/SchemaVisitorAdapter.java   |  38 +++++
 .../kafka/common/protocol/types/Field.java      |   5 +
 .../kafka/common/protocol/types/Schema.java     |   4 +-
 .../org/apache/kafka/common/utils/Utils.java    |  33 +++-
 .../memory/GarbageCollectedMemoryPoolTest.java  | 163 ++++++++++++++++++
 .../kafka/common/network/NioEchoServer.java     |   5 +-
 .../kafka/common/network/PlaintextSender.java   |  44 +++++
 .../kafka/common/network/SelectorTest.java      |  92 +++++++++-
 .../kafka/common/network/SslSelectorTest.java   |  95 ++++++++++-
 .../apache/kafka/common/network/SslSender.java  |  83 +++++++++
 .../common/network/SslTransportLayerTest.java   |   3 +-
 .../kafka/common/protocol/ProtoUtilsTest.java   |  35 ++++
 .../apache/kafka/common/utils/UtilsTest.java    |  12 ++
 .../scala/kafka/network/RequestChannel.scala    |  25 ++-
 .../main/scala/kafka/network/SocketServer.scala |  33 +++-
 .../main/scala/kafka/server/KafkaConfig.scala   |   7 +
 .../kafka/server/KafkaRequestHandler.scala      |   7 +-
 .../unit/kafka/network/SocketServerTest.scala   |   5 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |   3 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |   1 +
 38 files changed, 1446 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 1579a1c..4bd907b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -43,6 +43,7 @@
   <allow pkg="org.apache.kafka.common.security" />
   <allow pkg="org.apache.kafka.common.utils" />
   <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
+  <allow pkg="org.apache.kafka.common.memory" />
 
   <subpackage name="common">
     <disallow pkg="org.apache.kafka.clients" />
@@ -67,6 +68,10 @@
       <allow pkg="org.apache.kafka.common.metrics" />
     </subpackage>
 
+    <subpackage name="memory">
+      <allow pkg="org.apache.kafka.common.metrics" />
+    </subpackage>
+
     <subpackage name="network">
       <allow pkg="org.apache.kafka.common.security.auth" />
       <allow pkg="org.apache.kafka.common.protocol" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
new file mode 100644
index 0000000..041d1c2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Utils;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * An extension of SimpleMemoryPool that tracks allocated buffers and logs an error when they "leak"
+ * (when they are garbage-collected without having been release()ed).
+ * THIS IMPLEMENTATION IS A DEVELOPMENT/DEBUGGING AID AND IS NOT MEANT PRO PRODUCTION USE.
+ */
+public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements AutoCloseable {
+
+    private final ReferenceQueue<ByteBuffer> garbageCollectedBuffers = new ReferenceQueue<>();
+    //serves 2 purposes - 1st it maintains the ref objects reachable (which is a requirement for them
+    //to ever be enqueued), 2nd keeps some (small) metadata for every buffer allocated
+    private final Map<BufferReference, BufferMetadata> buffersInFlight = new ConcurrentHashMap<>();
+    private final GarbageCollectionListener gcListener = new GarbageCollectionListener();
+    private final Thread gcListenerThread;
+    private volatile boolean alive = true;
+
+    public GarbageCollectedMemoryPool(long sizeBytes, int maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) {
+        super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor);
+        this.alive = true;
+        this.gcListenerThread = new Thread(gcListener, "memory pool GC listener");
+        this.gcListenerThread.setDaemon(true); //so we dont need to worry about shutdown
+        this.gcListenerThread.start();
+    }
+
+    @Override
+    protected void bufferToBeReturned(ByteBuffer justAllocated) {
+        BufferReference ref = new BufferReference(justAllocated, garbageCollectedBuffers);
+        BufferMetadata metadata = new BufferMetadata(justAllocated.capacity());
+        if (buffersInFlight.put(ref, metadata) != null)
+            //this is a bug. it means either 2 different co-existing buffers got
+            //the same identity or we failed to register a released/GC'ed buffer
+            throw new IllegalStateException("allocated buffer identity " + ref.hashCode + " already registered as in use?!");
+
+        log.trace("allocated buffer of size {} and identity {}", sizeBytes, ref.hashCode);
+    }
+
+    @Override
+    protected void bufferToBeReleased(ByteBuffer justReleased) {
+        BufferReference ref = new BufferReference(justReleased); //used ro lookup only
+        BufferMetadata metadata = buffersInFlight.remove(ref);
+        if (metadata == null)
+            //its impossible for the buffer to have already been GC'ed (because we have a hard ref to it
+            //in the function arg) so this means either a double free or not our buffer.
+            throw new IllegalArgumentException("returned buffer " + ref.hashCode + " was never allocated by this pool");
+        if (metadata.sizeBytes != justReleased.capacity()) {
+            //this is a bug
+            throw new IllegalStateException("buffer " + ref.hashCode + " has capacity " + justReleased.capacity() + " but recorded as " + metadata.sizeBytes);
+        }
+        log.trace("released buffer of size {} and identity {}", metadata.sizeBytes, ref.hashCode);
+    }
+
+    @Override
+    public void close() throws Exception {
+        alive = false;
+        gcListenerThread.interrupt();
+    }
+
+    private class GarbageCollectionListener implements Runnable {
+        @Override
+        public void run() {
+            while (alive) {
+                try {
+                    BufferReference ref = (BufferReference) garbageCollectedBuffers.remove(); //blocks
+                    ref.clear();
+                    //this cannot race with a release() call because an object is either reachable or not,
+                    //release() can only happen before its GC'ed, and enqueue can only happen after.
+                    //if the ref was enqueued it must then not have been released
+                    BufferMetadata metadata = buffersInFlight.remove(ref);
+
+                    if (metadata == null) {
+                        //it can happen rarely that the buffer was release()ed properly (so no metadata) and yet
+                        //the reference object to it remains reachable for a short period of time after release()
+                        //and hence gets enqueued. this is because we keep refs in a ConcurrentHashMap which cleans
+                        //up keys lazily.
+                        continue;
+                    }
+
+                    availableMemory.addAndGet(metadata.sizeBytes);
+                    log.error("Reclaimed buffer of size {} and identity {} that was not properly release()ed. This is a bug.", metadata.sizeBytes, ref.hashCode);
+                } catch (InterruptedException e) {
+                    log.debug("interrupted", e);
+                    //ignore, we're a daemon thread
+                }
+            }
+            log.info("GC listener shutting down");
+        }
+    }
+
+    private static final class BufferMetadata {
+        private final int sizeBytes;
+
+        private BufferMetadata(int sizeBytes) {
+            this.sizeBytes = sizeBytes;
+        }
+    }
+
+    private static final class BufferReference extends WeakReference<ByteBuffer> {
+        private final int hashCode;
+
+        private BufferReference(ByteBuffer referent) { //used for lookup purposes only - no queue required.
+            this(referent, null);
+        }
+
+        private BufferReference(ByteBuffer referent, ReferenceQueue<? super ByteBuffer> q) {
+            super(referent, q);
+            hashCode = System.identityHashCode(referent);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) { //this is important to find leaked buffers (by ref identity)
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            BufferReference that = (BufferReference) o;
+            if (hashCode != that.hashCode) {
+                return false;
+            }
+            ByteBuffer thisBuf = get();
+            if (thisBuf == null) {
+                //our buffer has already been GC'ed, yet "that" is not us. so not same buffer
+                return false;
+            }
+            ByteBuffer thatBuf = that.get();
+            return thisBuf == thatBuf;
+        }
+
+        @Override
+        public int hashCode() {
+            return hashCode;
+        }
+    }
+
+    @Override
+    public String toString() {
+        long allocated = sizeBytes - availableMemory.get();
+        return "GarbageCollectedMemoryPool{" + Utils.formatBytes(allocated) + "/" + Utils.formatBytes(sizeBytes) + " used in " + buffersInFlight.size() + " buffers}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java
new file mode 100644
index 0000000..5887816
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * A common memory pool interface for non-blocking pools.
+ * Every buffer returned from {@link #tryAllocate(int)} must always be {@link #release(ByteBuffer) released}.
+ */
+public interface MemoryPool {
+    MemoryPool NONE = new MemoryPool() {
+        @Override
+        public ByteBuffer tryAllocate(int sizeBytes) {
+            return ByteBuffer.allocate(sizeBytes);
+        }
+
+        @Override
+        public void release(ByteBuffer previouslyAllocated) {
+            //nop
+        }
+
+        @Override
+        public long size() {
+            return Long.MAX_VALUE;
+        }
+
+        @Override
+        public long availableMemory() {
+            return Long.MAX_VALUE;
+        }
+
+        @Override
+        public boolean isOutOfMemory() {
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            return "NONE";
+        }
+    };
+
+    /**
+     * Tries to acquire a ByteBuffer of the specified size
+     * @param sizeBytes size required
+     * @return a ByteBuffer (which later needs to be release()ed), or null if no memory available.
+     *         the buffer will be of the exact size requested, even if backed by a larger chunk of memory
+     */
+    ByteBuffer tryAllocate(int sizeBytes);
+
+    /**
+     * Returns a previously allocated buffer to the pool.
+     * @param previouslyAllocated a buffer previously returned from tryAllocate()
+     */
+    void release(ByteBuffer previouslyAllocated);
+
+    /**
+     * Returns the total size of this pool
+     * @return total size, in bytes
+     */
+    long size();
+
+    /**
+     * Returns the amount of memory available for allocation by this pool.
+     * NOTE: result may be negative (pools may over allocate to avoid starvation issues)
+     * @return bytes available
+     */
+    long availableMemory();
+
+    /**
+     * Returns true if the pool cannot currently allocate any more buffers
+     * - meaning total outstanding buffers meets or exceeds pool size and
+     * some would need to be released before further allocations are possible.
+     *
+     * This is equivalent to availableMemory() <= 0
+     * @return true if out of memory
+     */
+    boolean isOutOfMemory();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java
new file mode 100644
index 0000000..f1ab8f7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * a simple pool implementation. this implementation just provides a limit on the total outstanding memory.
+ * any buffer allocated must be release()ed always otherwise memory is not marked as reclaimed (and "leak"s)
+ */
+public class SimpleMemoryPool implements MemoryPool {
+    protected final Logger log = LoggerFactory.getLogger(getClass()); //subclass-friendly
+
+    protected final long sizeBytes;
+    protected final boolean strict;
+    protected final AtomicLong availableMemory;
+    protected final int maxSingleAllocationSize;
+    protected final AtomicLong startOfNoMemPeriod = new AtomicLong(); //nanoseconds
+    protected volatile Sensor oomTimeSensor;
+
+    public SimpleMemoryPool(long sizeInBytes, int maxSingleAllocationBytes, boolean strict, Sensor oomPeriodSensor) {
+        if (sizeInBytes <= 0 || maxSingleAllocationBytes <= 0 || maxSingleAllocationBytes > sizeInBytes)
+            throw new IllegalArgumentException("must provide a positive size and max single allocation size smaller than size."
+                + "provided " + sizeInBytes + " and " + maxSingleAllocationBytes + " respectively");
+        this.sizeBytes = sizeInBytes;
+        this.strict = strict;
+        this.availableMemory = new AtomicLong(sizeInBytes);
+        this.maxSingleAllocationSize = maxSingleAllocationBytes;
+        this.oomTimeSensor = oomPeriodSensor;
+    }
+
+    @Override
+    public ByteBuffer tryAllocate(int sizeBytes) {
+        if (sizeBytes < 1)
+            throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");
+        if (sizeBytes > maxSingleAllocationSize)
+            throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);
+
+        long available;
+        boolean success = false;
+        //in strict mode we will only allocate memory if we have at least the size required.
+        //in non-strict mode we will allocate memory if we have _any_ memory available (so available memory
+        //can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize)
+        long threshold = strict ? sizeBytes : 1;
+        while ((available = availableMemory.get()) >= threshold) {
+            success = availableMemory.compareAndSet(available, available - sizeBytes);
+            if (success)
+                break;
+        }
+
+        if (success) {
+            maybeRecordEndOfDrySpell();
+        } else {
+            if (oomTimeSensor != null) {
+                startOfNoMemPeriod.compareAndSet(0, System.nanoTime());
+            }
+            log.trace("refused to allocate buffer of size {}", sizeBytes);
+            return null;
+        }
+
+        ByteBuffer allocated = ByteBuffer.allocate(sizeBytes);
+        bufferToBeReturned(allocated);
+        return allocated;
+    }
+
+    @Override
+    public void release(ByteBuffer previouslyAllocated) {
+        if (previouslyAllocated == null)
+            throw new IllegalArgumentException("provided null buffer");
+
+        bufferToBeReleased(previouslyAllocated);
+        availableMemory.addAndGet(previouslyAllocated.capacity());
+        maybeRecordEndOfDrySpell();
+    }
+
+    @Override
+    public long size() {
+        return sizeBytes;
+    }
+
+    @Override
+    public long availableMemory() {
+        return availableMemory.get();
+    }
+
+    @Override
+    public boolean isOutOfMemory() {
+        return availableMemory.get() <= 0;
+    }
+
+    //allows subclasses to do their own bookkeeping (and validation) _before_ memory is returned to client code.
+    protected void bufferToBeReturned(ByteBuffer justAllocated) {
+        log.trace("allocated buffer of size {} ", justAllocated.capacity());
+    }
+
+    //allows subclasses to do their own bookkeeping (and validation) _before_ memory is marked as reclaimed.
+    protected void bufferToBeReleased(ByteBuffer justReleased) {
+        log.trace("released buffer of size {}", justReleased.capacity());
+    }
+
+    @Override
+    public String toString() {
+        long allocated = sizeBytes - availableMemory.get();
+        return "SimpleMemoryPool{" + Utils.formatBytes(allocated) + "/" + Utils.formatBytes(sizeBytes) + " used}";
+    }
+
+    protected void maybeRecordEndOfDrySpell() {
+        if (oomTimeSensor != null) {
+            long startOfDrySpell = startOfNoMemPeriod.getAndSet(0);
+            if (startOfDrySpell != 0) {
+                //how long were we refusing allocation requests for
+                oomTimeSensor.record((System.nanoTime() - startOfDrySpell) / 1000000.0); //fractional (double) millis
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
index 0e5ca78..6c8890a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -20,6 +20,8 @@ import java.util.Map;
 import java.nio.channels.SelectionKey;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.memory.MemoryPool;
+
 
 /**
  * A ChannelBuilder interface to build Channel based on configs
@@ -36,10 +38,11 @@ public interface ChannelBuilder extends AutoCloseable {
      * returns a Channel with TransportLayer and Authenticator configured.
      * @param  id  channel id
      * @param  key SelectionKey
-     * @param  maxReceiveSize
+     * @param  maxReceiveSize max size of a single receive buffer to allocate
+     * @param  memoryPool memory pool from which to allocate buffers, or null for none
      * @return KafkaChannel
      */
-    KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException;
+    KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException;
 
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index b563c4a..1e76c43 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -25,6 +25,8 @@ import java.nio.channels.SelectionKey;
 
 import java.security.Principal;
 
+import java.util.Objects;
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.utils.Utils;
 
 public class KafkaChannel {
@@ -35,6 +37,7 @@ public class KafkaChannel {
     // The values are read and reset after each response is sent.
     private long networkThreadTimeNanos;
     private final int maxReceiveSize;
+    private final MemoryPool memoryPool;
     private NetworkReceive receive;
     private Send send;
     // Track connection and mute state of channels to enable outstanding requests on channels to be
@@ -43,12 +46,13 @@ public class KafkaChannel {
     private boolean muted;
     private ChannelState state;
 
-    public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
+    public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) throws IOException {
         this.id = id;
         this.transportLayer = transportLayer;
         this.authenticator = authenticator;
         this.networkThreadTimeNanos = 0L;
         this.maxReceiveSize = maxReceiveSize;
+        this.memoryPool = memoryPool;
         this.disconnected = false;
         this.muted = false;
         this.state = ChannelState.NOT_CONNECTED;
@@ -56,7 +60,7 @@ public class KafkaChannel {
 
     public void close() throws IOException {
         this.disconnected = true;
-        Utils.closeAll(transportLayer, authenticator);
+        Utils.closeAll(transportLayer, authenticator, receive);
     }
 
     /**
@@ -106,13 +110,16 @@ public class KafkaChannel {
         return id;
     }
 
-    public void mute() {
+    /**
+     * externally muting a channel should be done via selector to ensure proper state handling
+     */
+    void mute() {
         if (!disconnected)
             transportLayer.removeInterestOps(SelectionKey.OP_READ);
         muted = true;
     }
 
-    public void unmute() {
+    void unmute() {
         if (!disconnected)
             transportLayer.addInterestOps(SelectionKey.OP_READ);
         muted = false;
@@ -125,6 +132,17 @@ public class KafkaChannel {
         return muted;
     }
 
+    public boolean isInMutableState() {
+        //some requests do not require memory, so if we do not know what the current (or future) request is
+        //(receive == null) we dont mute. we also dont mute if whatever memory required has already been
+        //successfully allocated (if none is required for the currently-being-read request
+        //receive.memoryAllocated() is expected to return true)
+        if (receive == null || receive.memoryAllocated())
+            return false;
+        //also cannot mute if underlying transport is not in the ready state
+        return transportLayer.ready();
+    }
+
     public boolean ready() {
         return transportLayer.ready() && authenticator.complete();
     }
@@ -161,7 +179,7 @@ public class KafkaChannel {
         NetworkReceive result = null;
 
         if (receive == null) {
-            receive = new NetworkReceive(maxReceiveSize, id);
+            receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
         }
 
         receive(receive);
@@ -169,6 +187,9 @@ public class KafkaChannel {
             receive.payload().rewind();
             result = receive;
             receive = null;
+        } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
+            //pool must be out of memory, mute ourselves.
+            mute();
         }
         return result;
     }
@@ -210,4 +231,28 @@ public class KafkaChannel {
 
         return send.completed();
     }
+
+    /**
+     * @return true if underlying transport has bytes remaining to be read from any underlying intermediate buffers.
+     */
+    public boolean hasBytesBuffered() {
+        return transportLayer.hasBytesBuffered();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KafkaChannel that = (KafkaChannel) o;
+        return Objects.equals(id, that.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 582f064..564fbcd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.ScatteringByteChannel;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
@@ -29,10 +32,14 @@ public class NetworkReceive implements Receive {
 
     public final static String UNKNOWN_SOURCE = "";
     public final static int UNLIMITED = -1;
+    private static final Logger log = LoggerFactory.getLogger(NetworkReceive.class);
+    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
 
     private final String source;
     private final ByteBuffer size;
     private final int maxSize;
+    private final MemoryPool memoryPool;
+    private int requestedBufferSize = -1;
     private ByteBuffer buffer;
 
 
@@ -41,6 +48,7 @@ public class NetworkReceive implements Receive {
         this.buffer = buffer;
         this.size = null;
         this.maxSize = UNLIMITED;
+        this.memoryPool = MemoryPool.NONE;
     }
 
     public NetworkReceive(String source) {
@@ -48,6 +56,7 @@ public class NetworkReceive implements Receive {
         this.size = ByteBuffer.allocate(4);
         this.buffer = null;
         this.maxSize = UNLIMITED;
+        this.memoryPool = MemoryPool.NONE;
     }
 
     public NetworkReceive(int maxSize, String source) {
@@ -55,6 +64,15 @@ public class NetworkReceive implements Receive {
         this.size = ByteBuffer.allocate(4);
         this.buffer = null;
         this.maxSize = maxSize;
+        this.memoryPool = MemoryPool.NONE;
+    }
+
+    public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
+        this.source = source;
+        this.size = ByteBuffer.allocate(4);
+        this.buffer = null;
+        this.maxSize = maxSize;
+        this.memoryPool = memoryPool;
     }
 
     public NetworkReceive() {
@@ -68,13 +86,32 @@ public class NetworkReceive implements Receive {
 
     @Override
     public boolean complete() {
-        return !size.hasRemaining() && !buffer.hasRemaining();
+        return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
     }
 
     public long readFrom(ScatteringByteChannel channel) throws IOException {
         return readFromReadableChannel(channel);
     }
 
+    @Override
+    public boolean requiredMemoryAmountKnown() {
+        return requestedBufferSize != -1;
+    }
+
+    @Override
+    public boolean memoryAllocated() {
+        return buffer != null;
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        if (buffer != null && buffer != EMPTY_BUFFER) {
+            memoryPool.release(buffer);
+            buffer = null;
+        }
+    }
+
     // Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
     // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
     // This can go away after we get rid of BlockingChannel
@@ -93,10 +130,17 @@ public class NetworkReceive implements Receive {
                     throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                 if (maxSize != UNLIMITED && receiveSize > maxSize)
                     throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
-
-                this.buffer = ByteBuffer.allocate(receiveSize);
+                requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
+                if (receiveSize == 0) {
+                    buffer = EMPTY_BUFFER;
+                }
             }
         }
+        if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet
+            buffer = memoryPool.tryAllocate(requestedBufferSize);
+            if (buffer == null)
+                log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
+        }
         if (buffer != null) {
             int bytesRead = channel.read(buffer);
             if (bytesRead < 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index fb4f6ba..ad63671 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.network;
 import java.nio.channels.SelectionKey;
 import java.util.Map;
 
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.KafkaException;
 
@@ -39,12 +40,13 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
         }
     }
 
-    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+    @Override
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
             Authenticator authenticator = new DefaultAuthenticator();
             authenticator.configure(transportLayer, this.principalBuilder, this.configs);
-            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
             log.warn("Failed to create channel due to ", e);
             throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index 871b7ac..11c9565 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -214,6 +214,11 @@ public class PlaintextTransportLayer implements TransportLayer {
     }
 
     @Override
+    public boolean hasBytesBuffered() {
+        return false;
+    }
+
+    @Override
     public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
         return fileChannel.transferTo(position, count, socketChannel);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Receive.java b/clients/src/main/java/org/apache/kafka/common/network/Receive.java
index b7bbdb4..3bc2761 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Receive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Receive.java
@@ -16,13 +16,14 @@
  */
 package org.apache.kafka.common.network;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.channels.ScatteringByteChannel;
 
 /**
  * This interface models the in-progress reading of data from a channel to a source identified by an integer id
  */
-public interface Receive {
+public interface Receive extends Closeable {
 
     /**
      * The numeric id of the source from which we are receiving data.
@@ -42,4 +43,13 @@ public interface Receive {
      */
     long readFrom(ScatteringByteChannel channel) throws IOException;
 
+    /**
+     * Do we know yet how much memory we require to fully read this
+     */
+    boolean requiredMemoryAmountKnown();
+
+    /**
+     * Has the underlying memory required to complete reading been allocated yet?
+     */
+    boolean memoryAllocated();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 33a9f4d..342592b 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
@@ -100,7 +101,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
         }
     }
 
-    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+    @Override
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             SocketChannel socketChannel = (SocketChannel) key.channel();
             TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
@@ -114,7 +116,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
                         socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable);
             // Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes.
             authenticator.configure(transportLayer, null, this.configs);
-            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
             log.info("Failed to create channel due to ", e);
             throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index da3de80..38226f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
@@ -38,6 +39,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.MetricName;
@@ -87,11 +89,14 @@ public class Selector implements Selectable, AutoCloseable {
 
     private final java.nio.channels.Selector nioSelector;
     private final Map<String, KafkaChannel> channels;
+    private final Set<KafkaChannel> explicitlyMutedChannels;
+    private boolean outOfMemory;
     private final List<Send> completedSends;
     private final List<NetworkReceive> completedReceives;
     private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
     private final Set<SelectionKey> immediatelyConnectedKeys;
     private final Map<String, KafkaChannel> closingChannels;
+    private Set<SelectionKey> keysWithBufferedRead;
     private final Map<String, ChannelState> disconnected;
     private final List<String> connected;
     private final List<String> failedSends;
@@ -101,6 +106,11 @@ public class Selector implements Selectable, AutoCloseable {
     private final int maxReceiveSize;
     private final boolean recordTimePerConnection;
     private final IdleExpiryManager idleExpiryManager;
+    private final MemoryPool memoryPool;
+    private final long lowMemThreshold;
+    //indicates if the previous call to poll was able to make progress in reading already-buffered data.
+    //this is used to prevent tight loops when memory is not available to read any more data
+    private boolean madeReadProgressLastPoll = true;
 
     /**
      * Create a new nioSelector
@@ -122,7 +132,8 @@ public class Selector implements Selectable, AutoCloseable {
                     Map<String, String> metricTags,
                     boolean metricsPerConnection,
                     boolean recordTimePerConnection,
-                    ChannelBuilder channelBuilder) {
+                    ChannelBuilder channelBuilder,
+                    MemoryPool memoryPool) {
         try {
             this.nioSelector = java.nio.channels.Selector.open();
         } catch (IOException e) {
@@ -131,11 +142,14 @@ public class Selector implements Selectable, AutoCloseable {
         this.maxReceiveSize = maxReceiveSize;
         this.time = time;
         this.channels = new HashMap<>();
+        this.explicitlyMutedChannels = new HashSet<>();
+        this.outOfMemory = false;
         this.completedSends = new ArrayList<>();
         this.completedReceives = new ArrayList<>();
         this.stagedReceives = new HashMap<>();
         this.immediatelyConnectedKeys = new HashSet<>();
         this.closingChannels = new HashMap<>();
+        this.keysWithBufferedRead = new HashSet<>();
         this.connected = new ArrayList<>();
         this.disconnected = new HashMap<>();
         this.failedSends = new ArrayList<>();
@@ -143,6 +157,8 @@ public class Selector implements Selectable, AutoCloseable {
         this.channelBuilder = channelBuilder;
         this.recordTimePerConnection = recordTimePerConnection;
         this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
+        this.memoryPool = memoryPool;
+        this.lowMemThreshold = (long) (0.1 * this.memoryPool.size());
     }
 
     public Selector(int maxReceiveSize,
@@ -153,7 +169,7 @@ public class Selector implements Selectable, AutoCloseable {
             Map<String, String> metricTags,
             boolean metricsPerConnection,
             ChannelBuilder channelBuilder) {
-        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder);
+        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE);
     }
 
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
@@ -200,7 +216,7 @@ public class Selector implements Selectable, AutoCloseable {
         SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
         KafkaChannel channel;
         try {
-            channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+            channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
         } catch (Exception e) {
             try {
                 socketChannel.close();
@@ -227,7 +243,7 @@ public class Selector implements Selectable, AutoCloseable {
      */
     public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
         SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
-        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
         key.attach(channel);
         this.channels.put(id, channel);
     }
@@ -311,20 +327,46 @@ public class Selector implements Selectable, AutoCloseable {
         if (timeout < 0)
             throw new IllegalArgumentException("timeout should be >= 0");
 
+        boolean madeReadProgressLastCall = madeReadProgressLastPoll;
         clear();
 
-        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
+        boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
+
+        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
             timeout = 0;
 
+        if (!memoryPool.isOutOfMemory() && outOfMemory) {
+            //we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
+            log.trace("Broker no longer low on memory - unmuting incoming sockets");
+            for (KafkaChannel channel : channels.values()) {
+                if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
+                    channel.unmute();
+                }
+            }
+            outOfMemory = false;
+        }
+
         /* check ready keys */
         long startSelect = time.nanoseconds();
-        int readyKeys = select(timeout);
+        int numReadyKeys = select(timeout);
         long endSelect = time.nanoseconds();
         this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
 
-        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
-            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
+        if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
+            Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
+            keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
+
+            //poll from channels that have buffered data (but nothing more from the underlying socket)
+            if (!keysWithBufferedRead.isEmpty()) {
+                Set<SelectionKey> toPoll = keysWithBufferedRead;
+                keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
+                pollSelectionKeys(toPoll, false, endSelect);
+            }
+            //poll from channels where the underlying socket has more data
+            pollSelectionKeys(readyKeys, false, endSelect);
             pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
+        } else {
+            madeReadProgressLastPoll = true; //no work is also "progress"
         }
 
         long endIo = time.nanoseconds();
@@ -339,10 +381,16 @@ public class Selector implements Selectable, AutoCloseable {
         addToCompletedReceives();
     }
 
-    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
+    /**
+     * handle any ready I/O on a set of selection keys
+     * @param selectionKeys set of keys to handle
+     * @param isImmediatelyConnected true if running over a set of keys for just-connected sockets
+     * @param currentTimeNanos time at which set of keys was determined
+     */
+    private void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                                    boolean isImmediatelyConnected,
                                    long currentTimeNanos) {
-        Iterator<SelectionKey> iterator = selectionKeys.iterator();
+        Iterator<SelectionKey> iterator = determineHandlingOrder(selectionKeys).iterator();
         while (iterator.hasNext()) {
             SelectionKey key = iterator.next();
             iterator.remove();
@@ -372,14 +420,18 @@ public class Selector implements Selectable, AutoCloseable {
                 }
 
                 /* if channel is not ready finish prepare */
-                if (channel.isConnected() && !channel.ready())
+                if (channel.isConnected() && !channel.ready()) {
                     channel.prepare();
+                }
+
+                attemptRead(key, channel);
 
-                /* if channel is ready read from any connections that have readable data */
-                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
-                    NetworkReceive networkReceive;
-                    while ((networkReceive = channel.read()) != null)
-                        addToStagedReceives(channel, networkReceive);
+                if (channel.hasBytesBuffered()) {
+                    //this channel has bytes enqueued in intermediary buffers that we could not read
+                    //(possibly because no memory). it may be the case that the underlying socket will
+                    //not come up in the next poll() and so we need to remember this channel for the
+                    //next poll call otherwise data may be stuck in said buffers forever.
+                    keysWithBufferedRead.add(key);
                 }
 
                 /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
@@ -408,6 +460,39 @@ public class Selector implements Selectable, AutoCloseable {
         }
     }
 
+    private Collection<SelectionKey> determineHandlingOrder(Set<SelectionKey> selectionKeys) {
+        //it is possible that the iteration order over selectionKeys is the same every invocation.
+        //this may cause starvation of reads when memory is low. to address this we shuffle the keys if memory is low.
+        Collection<SelectionKey> inHandlingOrder;
+
+        if (!outOfMemory && memoryPool.availableMemory() < lowMemThreshold) {
+            List<SelectionKey> temp = new ArrayList<>(selectionKeys);
+            Collections.shuffle(temp);
+            inHandlingOrder = temp;
+        } else {
+            inHandlingOrder = selectionKeys;
+        }
+        return inHandlingOrder;
+    }
+
+    private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
+        //if channel is ready and has bytes to read from socket or buffer, and has no
+        //previous receive(s) already staged or otherwise in progress then read from it
+        if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
+            && !explicitlyMutedChannels.contains(channel)) {
+            NetworkReceive networkReceive;
+            while ((networkReceive = channel.read()) != null) {
+                madeReadProgressLastPoll = true;
+                addToStagedReceives(channel, networkReceive);
+            }
+            if (channel.isMute()) {
+                outOfMemory = true; //channel has muted itself due to memory pressure.
+            } else {
+                madeReadProgressLastPoll = true;
+            }
+        }
+    }
+
     // Record time spent in pollSelectionKeys for channel (moved into a method to keep checkstyle happy)
     private void maybeRecordTimePerConnection(KafkaChannel channel, long startTimeNanos) {
         if (recordTimePerConnection)
@@ -442,6 +527,7 @@ public class Selector implements Selectable, AutoCloseable {
 
     private void mute(KafkaChannel channel) {
         channel.mute();
+        explicitlyMutedChannels.add(channel);
     }
 
     @Override
@@ -451,6 +537,7 @@ public class Selector implements Selectable, AutoCloseable {
     }
 
     private void unmute(KafkaChannel channel) {
+        explicitlyMutedChannels.remove(channel);
         channel.unmute();
     }
 
@@ -509,6 +596,7 @@ public class Selector implements Selectable, AutoCloseable {
             this.disconnected.put(channel, ChannelState.FAILED_SEND);
         }
         this.failedSends.clear();
+        this.madeReadProgressLastPoll = false;
     }
 
     /**
@@ -674,7 +762,7 @@ public class Selector implements Selectable, AutoCloseable {
             while (iter.hasNext()) {
                 Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
                 KafkaChannel channel = entry.getKey();
-                if (!channel.isMute()) {
+                if (!explicitlyMutedChannels.contains(channel)) {
                     Deque<NetworkReceive> deque = entry.getValue();
                     addToCompletedReceives(channel, deque);
                     if (deque.isEmpty())
@@ -900,4 +988,13 @@ public class Selector implements Selectable, AutoCloseable {
         }
     }
 
+    //package-private for testing
+    boolean isOutOfMemory() {
+        return outOfMemory;
+    }
+
+    //package-private for testing
+    boolean isMadeReadProgressLastPoll() {
+        return madeReadProgressLastPoll;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index bc34b70..01b72fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -22,6 +22,7 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
 
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.KafkaException;
@@ -50,12 +51,13 @@ public class SslChannelBuilder implements ChannelBuilder {
         }
     }
 
-    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+    @Override
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key));
             Authenticator authenticator = new DefaultAuthenticator();
             authenticator.configure(transportLayer, this.principalBuilder, this.configs);
-            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
             log.info("Failed to create channel due to ", e);
             throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index eb423e3..3cd0114 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -739,6 +739,11 @@ public class SslTransportLayer implements TransportLayer {
     }
 
     @Override
+    public boolean hasBytesBuffered() {
+        return netReadBuffer.position() != 0 || appReadBuffer.position() != 0;
+    }
+
+    @Override
     public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
         return fileChannel.transferTo(position, count, this);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
index fad0cea..be56ad5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -85,6 +85,11 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
     boolean isMute();
 
     /**
+     * @return true if channel has bytes to be read in any intermediate buffers
+     */
+    boolean hasBytesBuffered();
+
+    /**
      * Transfers bytes from `fileChannel` to this `TransportLayer`.
      *
      * This method will delegate to {@link FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)},
@@ -99,5 +104,4 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
      * @see FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)
      */
     long transferFrom(FileChannel fileChannel, long position, long count) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
new file mode 100644
index 0000000..5d39dff
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Type;
+
+public class ProtoUtils {
+    public static void walk(Schema schema, SchemaVisitor visitor) {
+        if (schema == null || visitor == null) {
+            throw new IllegalArgumentException("Both schema and visitor must be provided");
+        }
+        handleNode(schema, visitor);
+    }
+
+    private static void handleNode(Type node, SchemaVisitor visitor) {
+        if (node instanceof Schema) {
+            Schema schema = (Schema) node;
+            visitor.visit(schema);
+            for (Field f : schema.fields()) {
+                handleNode(f.type, visitor);
+            }
+        } else if (node instanceof ArrayOf) {
+            ArrayOf array = (ArrayOf) node;
+            visitor.visit(array);
+            handleNode(array.type(), visitor);
+        } else {
+            visitor.visit(node);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 329d99b..ee40133 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -21,10 +21,13 @@ import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Type;
 
+import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
 import static org.apache.kafka.common.protocol.types.Type.BYTES;
@@ -32,6 +35,7 @@ import static org.apache.kafka.common.protocol.types.Type.INT16;
 import static org.apache.kafka.common.protocol.types.Type.INT32;
 import static org.apache.kafka.common.protocol.types.Type.INT64;
 import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
 import static org.apache.kafka.common.protocol.types.Type.RECORDS;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
 import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
@@ -1825,6 +1829,7 @@ public class Protocol {
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
     public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
     static final short[] MIN_VERSIONS = new short[ApiKeys.MAX_API_KEY + 1];
+    static final EnumSet<ApiKeys> DELAYED_DEALLOCATION_REQUESTS; //initialized in static block
 
     /* the latest version of each api */
     static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
@@ -1924,6 +1929,44 @@ public class Protocol {
                     throw new IllegalStateException("Request and response for version " + i + " of API "
                             + api.id + " are defined inconsistently. One is null while the other is not null.");
         }
+
+        /* go over all request schemata and find those that retain links to the underlying ByteBuffer from
+         * which they were read out. knowing which requests do (and do not) retain a reference to the buffer
+         * is needed to enable buffers to be released as soon as possible for requests that no longer need them */
+        Set<ApiKeys> requestsWithBufferRefs = new HashSet<>();
+        for (int reqId = 0; reqId < REQUESTS.length; reqId++) {
+            ApiKeys requestType = ApiKeys.forId(reqId);
+            Schema[] schemata = REQUESTS[reqId];
+            if (schemata == null) {
+                continue;
+            }
+            for (Schema requestVersionSchema : schemata) {
+                if (retainsBufferReference(requestVersionSchema)) {
+                    requestsWithBufferRefs.add(requestType);
+                    break; //kafka is loose with versions, so if _ANY_ version retains buffers we must assume all do.
+                }
+            }
+        }
+
+        DELAYED_DEALLOCATION_REQUESTS = EnumSet.copyOf(requestsWithBufferRefs);
+    }
+
+    private static boolean retainsBufferReference(Schema schema) {
+        if (schema == null) {
+            return false;
+        }
+        final AtomicReference<Boolean> foundBufferReference = new AtomicReference<>(Boolean.FALSE);
+        SchemaVisitor detector = new SchemaVisitorAdapter() {
+            @Override
+            public void visit(Type field) {
+                if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS) {
+                    foundBufferReference.set(Boolean.TRUE);
+                }
+            }
+        };
+        foundBufferReference.set(Boolean.FALSE);
+        ProtoUtils.walk(schema, detector);
+        return foundBufferReference.get();
     }
 
     public static boolean apiVersionSupported(short apiKey, short apiVersion) {
@@ -1936,6 +1979,10 @@ public class Protocol {
                 0);
     }
 
+    public static boolean requiresDelayedDeallocation(int apiKey) {
+        return DELAYED_DEALLOCATION_REQUESTS.contains(ApiKeys.forId(apiKey));
+    }
+    
     private static String indentString(int size) {
         StringBuilder b = new StringBuilder(size);
         for (int i = 0; i < size; i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
new file mode 100644
index 0000000..e61cc77
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Type;
+
+public interface SchemaVisitor {
+    void visit(Schema schema);
+    void visit(ArrayOf array);
+    void visit(Type field);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
new file mode 100644
index 0000000..62834d0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Type;
+
+public abstract class SchemaVisitorAdapter implements SchemaVisitor {
+    @Override
+    public void visit(Schema schema) {
+        //nop
+    }
+
+    @Override
+    public void visit(ArrayOf array) {
+        //nop
+    }
+
+    @Override
+    public void visit(Type field) {
+        //nop
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
index 605174d..29a89d4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
@@ -70,4 +70,9 @@ public class Field {
         return schema;
     }
 
+
+    @Override
+    public String toString() {
+        return name + ":" + type;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index fbb520c..a9c08aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -139,9 +139,7 @@ public class Schema extends Type {
         StringBuilder b = new StringBuilder();
         b.append('{');
         for (int i = 0; i < this.fields.length; i++) {
-            b.append(this.fields[i].name);
-            b.append(':');
-            b.append(this.fields[i].type());
+            b.append(this.fields[i].toString());
             if (i < this.fields.length - 1)
                 b.append(',');
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index e997fef..75f8cf7 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.text.DecimalFormat;
 import org.apache.kafka.common.KafkaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +63,11 @@ public class Utils {
     // IPv6 is supported with [ip] pattern
     private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%._:]*)\\]?:([0-9]+)");
 
+    // Prints up to 2 decimal digits. Used for human readable printing
+    private static final DecimalFormat TWO_DIGIT_FORMAT = new DecimalFormat("0.##");
+
+    private static final String[] BYTE_SCALE_SUFFIXES = new String[] {"B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"};
+
     public static final String NL = System.getProperty("line.separator");
 
     private static final Logger log = LoggerFactory.getLogger(Utils.class);
@@ -379,6 +385,28 @@ public class Utils {
     }
 
     /**
+     * Formats a byte number as a human readable String ("3.2 MB")
+     * @param bytes some size in bytes
+     * @return
+     */
+    public static String formatBytes(long bytes) {
+        if (bytes < 0) {
+            return "" + bytes;
+        }
+        double asDouble = (double) bytes;
+        int ordinal = (int) Math.floor(Math.log(asDouble) / Math.log(1024.0));
+        double scale = Math.pow(1024.0, ordinal);
+        double scaled = asDouble / scale;
+        String formatted = TWO_DIGIT_FORMAT.format(scaled);
+        try {
+            return formatted + " " + BYTE_SCALE_SUFFIXES[ordinal];
+        } catch (IndexOutOfBoundsException e) {
+            //huge number?
+            return "" + asDouble;
+        }
+    }
+
+    /**
      * Create a string representation of an array joined by the given separator
      * @param strs The array of items
      * @param separator The separator
@@ -613,7 +641,7 @@ public class Utils {
         } catch (IOException outer) {
             try {
                 Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
-                log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, 
+                log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target,
                         outer.getMessage());
             } catch (IOException inner) {
                 inner.addSuppressed(outer);
@@ -632,7 +660,8 @@ public class Utils {
         IOException exception = null;
         for (Closeable closeable : closeables) {
             try {
-                closeable.close();
+                if (closeable != null)
+                    closeable.close();
             } catch (IOException e) {
                 if (exception != null)
                     exception.addSuppressed(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java b/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java
new file mode 100644
index 0000000..788d447
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class GarbageCollectedMemoryPoolTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testZeroSize() throws Exception {
+        new GarbageCollectedMemoryPool(0, 7, true, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNegativeSize() throws Exception {
+        new GarbageCollectedMemoryPool(-1, 7, false, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testZeroMaxAllocation() throws Exception {
+        new GarbageCollectedMemoryPool(100, 0, true, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNegativeMaxAllocation() throws Exception {
+        new GarbageCollectedMemoryPool(100, -1, false, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMaxAllocationLargerThanSize() throws Exception {
+        new GarbageCollectedMemoryPool(100, 101, true, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAllocationOverMaxAllocation() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
+        pool.tryAllocate(11);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAllocationZero() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
+        pool.tryAllocate(0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAllocationNegative() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
+        pool.tryAllocate(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testReleaseNull() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
+        pool.release(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testReleaseForeignBuffer() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
+        ByteBuffer fellOffATruck = ByteBuffer.allocate(1);
+        pool.release(fellOffATruck);
+    }
+
+    @Test
+    public void testDoubleFree() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
+        ByteBuffer buffer = pool.tryAllocate(5);
+        Assert.assertNotNull(buffer);
+        pool.release(buffer); //so far so good
+        try {
+            pool.release(buffer);
+            Assert.fail("2nd release() should have failed");
+        } catch (IllegalArgumentException e) {
+            //as expected
+        } catch (Throwable t) {
+            Assert.fail("expected an IllegalArgumentException. instead got " + t);
+        }
+    }
+
+    @Test
+    public void testAllocationBound() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(21, 10, false, null);
+        ByteBuffer buf1 = pool.tryAllocate(10);
+        Assert.assertNotNull(buf1);
+        Assert.assertEquals(10, buf1.capacity());
+        ByteBuffer buf2 = pool.tryAllocate(10);
+        Assert.assertNotNull(buf2);
+        Assert.assertEquals(10, buf2.capacity());
+        ByteBuffer buf3 = pool.tryAllocate(10);
+        Assert.assertNotNull(buf3);
+        Assert.assertEquals(10, buf3.capacity());
+        //no more allocations
+        Assert.assertNull(pool.tryAllocate(1));
+        //release a buffer
+        pool.release(buf3);
+        //now we can have more
+        ByteBuffer buf4 = pool.tryAllocate(10);
+        Assert.assertNotNull(buf4);
+        Assert.assertEquals(10, buf4.capacity());
+        //no more allocations
+        Assert.assertNull(pool.tryAllocate(1));
+    }
+
+    @Test
+    public void testBuffersGarbageCollected() throws Exception {
+        Runtime runtime = Runtime.getRuntime();
+        long maxHeap = runtime.maxMemory(); //in bytes
+        long maxPool = maxHeap / 2;
+        long maxSingleAllocation = maxPool / 10;
+        Assert.assertTrue(maxSingleAllocation < Integer.MAX_VALUE / 2); //test JVM running with too much memory for this test logic (?)
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(maxPool, (int) maxSingleAllocation, false, null);
+
+        //we will allocate 30 buffers from this pool, which is sized such that at-most
+        //11 should coexist and 30 do not fit in the JVM memory, proving that:
+        // 1. buffers were reclaimed and
+        // 2. the pool registered the reclamation.
+
+        int timeoutSeconds = 30;
+        long giveUp = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds);
+        boolean success = false;
+
+        int buffersAllocated = 0;
+        while (System.currentTimeMillis() < giveUp) {
+            ByteBuffer buffer = pool.tryAllocate((int) maxSingleAllocation);
+            if (buffer == null) {
+                System.gc();
+                Thread.sleep(10);
+                continue;
+            }
+            buffersAllocated++;
+            if (buffersAllocated >= 30) {
+                success = true;
+                break;
+            }
+        }
+
+        Assert.assertTrue("failed to allocate 30 buffers in " + timeoutSeconds + " seconds."
+                + " buffers allocated: " + buffersAllocated + " heap " + Utils.formatBytes(maxHeap)
+                + " pool " + Utils.formatBytes(maxPool) + " single allocation "
+                + Utils.formatBytes(maxSingleAllocation), success);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 85c5002..fd4ef69 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -94,14 +94,15 @@ public class NioEchoServer extends Thread {
                 List<NetworkReceive> completedReceives = selector.completedReceives();
                 for (NetworkReceive rcv : completedReceives) {
                     KafkaChannel channel = channel(rcv.source());
-                    channel.mute();
+                    String channelId = channel.id();
+                    selector.mute(channelId);
                     NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
                     if (outputChannel == null)
                         selector.send(send);
                     else {
                         for (ByteBuffer buffer : send.buffers)
                             outputChannel.write(buffer);
-                        channel.unmute();
+                        selector.unmute(channelId);
                     }
                 }
                 for (Send send : selector.completedSends())

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
new file mode 100644
index 0000000..3338d03
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+/**
+ * test helper class that will connect to a given server address, write out the given payload and disconnect
+ */
+public class PlaintextSender extends Thread {
+
+    public PlaintextSender(final InetSocketAddress serverAddress, final byte[] payload) {
+        super(new Runnable() {
+            @Override
+            public void run() {
+                try (Socket connection = new Socket(serverAddress.getAddress(), serverAddress.getPort());
+                     OutputStream os = connection.getOutputStream()) {
+                    os.write(payload);
+                    os.flush();
+                } catch (Exception e) {
+                    e.printStackTrace(System.err);
+                }
+            }
+        });
+        setDaemon(true);
+        setName("PlaintextSender - " + payload.length + " bytes @ " + serverAddress);
+    }
+}