You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/07/26 06:20:07 UTC
[1/2] kafka git commit: KAFKA-4602;
KIP-72 - Allow putting a bound on memory consumed by Incoming requests
Repository: kafka
Updated Branches:
refs/heads/trunk f15cdbc91 -> 47ee8e954
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index cc061da..cc6a394 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -22,13 +22,22 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
+import java.util.Random;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.memory.SimpleMemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
@@ -50,7 +59,7 @@ public class SelectorTest {
protected Time time;
protected Selector selector;
protected ChannelBuilder channelBuilder;
- private Metrics metrics;
+ protected Metrics metrics;
@Before
public void setUp() throws Exception {
@@ -322,6 +331,87 @@ public class SelectorTest {
assertTrue("Unexpected receive", selector.completedReceives().isEmpty());
}
+ @Test
+ public void testMuteOnOOM() throws Exception {
+ //clean up default selector, replace it with one that uses a finite mem pool
+ selector.close();
+ MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
+ selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
+ new HashMap<String, String>(), true, false, channelBuilder, pool);
+
+ try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+ ss.bind(new InetSocketAddress(0));
+
+ InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
+
+ Thread sender1 = createSender(serverAddress, randomPayload(900));
+ Thread sender2 = createSender(serverAddress, randomPayload(900));
+ sender1.start();
+ sender2.start();
+
+ //wait until everything has been flushed out to network (assuming payload size is smaller than OS buffer size)
+ //this is important because we assume both requests' prefixes (1st 4 bytes) have made it.
+ sender1.join(5000);
+ sender2.join(5000);
+
+ SocketChannel channelX = ss.accept(); //not defined if its 1 or 2
+ channelX.configureBlocking(false);
+ SocketChannel channelY = ss.accept();
+ channelY.configureBlocking(false);
+ selector.register("clientX", channelX);
+ selector.register("clientY", channelY);
+
+ List<NetworkReceive> completed = Collections.emptyList();
+ long deadline = System.currentTimeMillis() + 5000;
+ while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
+ selector.poll(1000);
+ completed = selector.completedReceives();
+ }
+ assertEquals("could not read a single request within timeout", 1, completed.size());
+ NetworkReceive firstReceive = completed.get(0);
+ assertEquals(0, pool.availableMemory());
+ assertTrue(selector.isOutOfMemory());
+
+ selector.poll(10);
+ assertTrue(selector.completedReceives().isEmpty());
+ assertEquals(0, pool.availableMemory());
+ assertTrue(selector.isOutOfMemory());
+
+ firstReceive.close();
+ assertEquals(900, pool.availableMemory()); //memory has been released back to pool
+
+ completed = Collections.emptyList();
+ deadline = System.currentTimeMillis() + 5000;
+ while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
+ selector.poll(1000);
+ completed = selector.completedReceives();
+ }
+ assertEquals("could not read a single request within timeout", 1, selector.completedReceives().size());
+ assertEquals(0, pool.availableMemory());
+ assertFalse(selector.isOutOfMemory());
+ }
+ }
+
+ private Thread createSender(InetSocketAddress serverAddress, byte[] payload) {
+ return new PlaintextSender(serverAddress, payload);
+ }
+
+ protected byte[] randomPayload(int sizeBytes) throws Exception {
+ Random random = new Random();
+ byte[] payload = new byte[sizeBytes + 4];
+ random.nextBytes(payload);
+ ByteArrayOutputStream prefixOs = new ByteArrayOutputStream();
+ DataOutputStream prefixDos = new DataOutputStream(prefixOs);
+ prefixDos.writeInt(sizeBytes);
+ prefixDos.flush();
+ prefixDos.close();
+ prefixOs.flush();
+ prefixOs.close();
+ byte[] prefix = prefixOs.toByteArray();
+ System.arraycopy(prefix, 0, payload, 0, prefix.length);
+ return payload;
+ }
+
private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s));
selector.poll(1000L);
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index e272855..46d3b79 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -17,17 +17,23 @@
package org.apache.kafka.common.network;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.memory.SimpleMemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
@@ -35,6 +41,7 @@ import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestSslUtils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -43,7 +50,6 @@ import org.junit.Test;
*/
public class SslSelectorTest extends SelectorTest {
- private Metrics metrics;
private Map<String, Object> sslClientConfigs;
@Before
@@ -160,6 +166,90 @@ public class SslSelectorTest extends SelectorTest {
}
+ @Override
+ public void testMuteOnOOM() throws Exception {
+ //clean up default selector, replace it with one that uses a finite mem pool
+ selector.close();
+ MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
+ //the initial channel builder is for clients, we need a server one
+ File trustStoreFile = File.createTempFile("truststore", ".jks");
+ Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
+ sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
+ channelBuilder = new SslChannelBuilder(Mode.SERVER);
+ channelBuilder.configure(sslServerConfigs);
+ selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
+ new HashMap<String, String>(), true, false, channelBuilder, pool);
+
+ try (ServerSocketChannel ss = ServerSocketChannel.open()) {
+ ss.bind(new InetSocketAddress(0));
+
+ InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
+
+ SslSender sender1 = createSender(serverAddress, randomPayload(900));
+ SslSender sender2 = createSender(serverAddress, randomPayload(900));
+ sender1.start();
+ sender2.start();
+
+ SocketChannel channelX = ss.accept(); //not defined if its 1 or 2
+ channelX.configureBlocking(false);
+ SocketChannel channelY = ss.accept();
+ channelY.configureBlocking(false);
+ selector.register("clientX", channelX);
+ selector.register("clientY", channelY);
+
+ boolean success = false;
+ NetworkReceive firstReceive = null;
+ long deadline = System.currentTimeMillis() + 5000;
+ //keep calling poll until:
+ //1. both senders have completed the handshakes (so server selector has tried reading both payloads)
+ //2. a single payload is actually read out completely (the other is too big to fit)
+ while (System.currentTimeMillis() < deadline) {
+ selector.poll(10);
+
+ List<NetworkReceive> completed = selector.completedReceives();
+ if (firstReceive == null) {
+ if (!completed.isEmpty()) {
+ assertEquals("expecting a single request", 1, completed.size());
+ firstReceive = completed.get(0);
+ assertTrue(selector.isMadeReadProgressLastPoll());
+ assertEquals(0, pool.availableMemory());
+ }
+ } else {
+ assertTrue("only expecting single request", completed.isEmpty());
+ }
+
+ boolean handshaked = sender1.waitForHandshake(1);
+ handshaked = handshaked && sender2.waitForHandshake(1);
+
+ if (handshaked && firstReceive != null) {
+ success = true;
+ break;
+ }
+ }
+ if (!success) {
+ Assert.fail("could not initiate connections within timeout");
+ }
+
+ selector.poll(10);
+ assertTrue(selector.completedReceives().isEmpty());
+ assertEquals(0, pool.availableMemory());
+ assertTrue(selector.isOutOfMemory());
+
+ firstReceive.close();
+ assertEquals(900, pool.availableMemory()); //memory has been released back to pool
+
+ List<NetworkReceive> completed = Collections.emptyList();
+ deadline = System.currentTimeMillis() + 5000;
+ while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
+ selector.poll(1000);
+ completed = selector.completedReceives();
+ }
+ assertEquals("could not read remaining request within timeout", 1, completed.size());
+ assertEquals(0, pool.availableMemory());
+ assertFalse(selector.isOutOfMemory());
+ }
+ }
+
/**
* Connects and waits for handshake to complete. This is required since SslTransportLayer
* implementation requires the channel to be ready before send is invoked (unlike plaintext
@@ -169,4 +259,7 @@ public class SslSelectorTest extends SelectorTest {
blockingConnect(node, serverAddr);
}
+ private SslSender createSender(InetSocketAddress serverAddress, byte[] payload) {
+ return new SslSender(serverAddress, payload);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSender.java b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
new file mode 100644
index 0000000..cae69cb
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSender.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class SslSender extends Thread {
+
+ private final InetSocketAddress serverAddress;
+ private final byte[] payload;
+ private final CountDownLatch handshaked = new CountDownLatch(1);
+
+ public SslSender(InetSocketAddress serverAddress, byte[] payload) {
+ this.serverAddress = serverAddress;
+ this.payload = payload;
+ setDaemon(true);
+ setName("SslSender - " + payload.length + " bytes @ " + serverAddress);
+ }
+
+ @Override
+ public void run() {
+ try {
+ SSLContext sc = SSLContext.getInstance("TLSv1.2");
+ sc.init(null, new TrustManager[]{new NaiveTrustManager()}, new java.security.SecureRandom());
+ try (SSLSocket connection = (SSLSocket) sc.getSocketFactory().createSocket(serverAddress.getAddress(), serverAddress.getPort())) {
+ OutputStream os = connection.getOutputStream();
+ connection.startHandshake();
+ handshaked.countDown();
+ os.write(payload);
+ os.flush();
+ }
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ }
+ }
+
+ public boolean waitForHandshake(long timeoutMillis) throws InterruptedException {
+ return handshaked.await(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * blindly trust any certificate presented to it
+ */
+ private static class NaiveTrustManager implements X509TrustManager {
+ @Override
+ public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ //nop
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ //nop
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index bb5d2a7..8338ad7 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -39,6 +39,7 @@ import javax.net.ssl.SSLParameters;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.metrics.Metrics;
@@ -580,7 +581,7 @@ public class SslTransportLayerTest {
public void testNetworkThreadTimeRecorded() throws Exception {
selector.close();
this.selector = new Selector(NetworkReceive.UNLIMITED, 5000, new Metrics(), Time.SYSTEM,
- "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder);
+ "MetricGroup", new HashMap<String, String>(), false, true, channelBuilder, MemoryPool.NONE);
String node = "0";
server = createEchoServer(SecurityProtocol.SSL);
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
new file mode 100644
index 0000000..2b2cc91
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ProtoUtilsTest {
+ @Test
+ public void testDelayedAllocationSchemaDetection() throws Exception {
+ //verifies that schemas known to retain a reference to the underlying byte buffer are correctly detected.
+ for (ApiKeys key : ApiKeys.values()) {
+ if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP) {
+ Assert.assertTrue(Protocol.requiresDelayedDeallocation(key.id));
+ } else {
+ Assert.assertFalse(Protocol.requiresDelayedDeallocation(key.id));
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 2d6d05c..3feeff2 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.Random;
import static org.apache.kafka.common.utils.Utils.formatAddress;
+import static org.apache.kafka.common.utils.Utils.formatBytes;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.junit.Assert.assertArrayEquals;
@@ -78,6 +79,17 @@ public class UtilsTest {
}
@Test
+ public void testFormatBytes() {
+ assertEquals("-1", formatBytes(-1));
+ assertEquals("1023 B", formatBytes(1023));
+ assertEquals("1 KB", formatBytes(1024));
+ assertEquals("1024 KB", formatBytes((1024 * 1024) - 1));
+ assertEquals("1 MB", formatBytes(1024 * 1024));
+ assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024)));
+ assertEquals("10 MB", formatBytes(10 * 1024 * 1024));
+ }
+
+ @Test
public void testJoin() {
assertEquals("", Utils.join(Collections.emptyList(), ","));
assertEquals("1", Utils.join(Arrays.asList("1"), ","));
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index bd71340..6b8dbaa 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -29,9 +29,10 @@ import kafka.server.QuotaId
import kafka.utils.{Logging, NotNothing}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
-import org.apache.kafka.common.record.{RecordBatch, MemoryRecords}
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
@@ -41,7 +42,7 @@ import scala.reflect.ClassTag
object RequestChannel extends Logging {
val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
- buffer = shutdownReceive, startTimeNanos = 0, listenerName = new ListenerName(""),
+ buffer = shutdownReceive, memoryPool = MemoryPool.NONE, startTimeNanos = 0, listenerName = new ListenerName(""),
securityProtocol = SecurityProtocol.PLAINTEXT)
private val requestLogger = Logger.getLogger("kafka.request.logger")
@@ -56,10 +57,12 @@ object RequestChannel extends Logging {
val sanitizedUser = QuotaId.sanitize(principal.getName)
}
- case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer,
- startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
+ case class Request(processor: Int, connectionId: String, session: Session, buffer: ByteBuffer,
+ private val memoryPool: MemoryPool, startTimeNanos: Long, listenerName: ListenerName,
+ securityProtocol: SecurityProtocol) {
// These need to be volatile because the readers are in the network thread and the writers are in the request
// handler threads or the purgatory threads
+ @volatile var bufferReference = buffer
@volatile var requestDequeueTimeNanos = -1L
@volatile var apiLocalCompleteTimeNanos = -1L
@volatile var responseCompleteTimeNanos = -1L
@@ -104,7 +107,12 @@ object RequestChannel extends Logging {
else
null
- buffer = null
+ //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
+ //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
+ //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+ if (!Protocol.requiresDelayedDeallocation(requestId)) {
+ dispose()
+ }
def requestDesc(details: Boolean): String = {
if (requestObj != null)
@@ -194,6 +202,13 @@ object RequestChannel extends Logging {
.format(requestDesc(detailsEnabled), connectionId, totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs, responseSendTimeMs, securityProtocol, session.principal, listenerName.value))
}
}
+
+ def dispose(): Unit = {
+ if (bufferReference != null) {
+ memoryPool.release(bufferReference)
+ bufferReference = null
+ }
+ }
}
object Response {
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 2ba5553..e541015 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -32,7 +32,9 @@ import kafka.security.CredentialProvider
import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.metrics.stats.Rate
import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.protocol.SecurityProtocol
@@ -61,6 +63,10 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
+ private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
+ private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", "socket-server-metrics")
+ memoryPoolSensor.add(memoryPoolDepletedPercentMetricName, new Rate(TimeUnit.MILLISECONDS))
+ private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
private val processors = new Array[Processor](totalProcessorThreads)
@@ -86,7 +92,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
val processorEndIndex = processorBeginIndex + numProcessorThreads
for (i <- processorBeginIndex until processorEndIndex)
- processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
+ processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
@@ -109,7 +115,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
}.sum / totalProcessorThreads
}
)
-
+ newGauge("MemoryPoolAvailable",
+ new Gauge[Long] {
+ def value = memoryPool.availableMemory()
+ }
+ )
+ newGauge("MemoryPoolUsed",
+ new Gauge[Long] {
+ def value = memoryPool.size() - memoryPool.availableMemory()
+ }
+ )
info("Started " + acceptors.size + " acceptor threads")
}
@@ -138,7 +153,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
/* `protected` for test usage */
protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
- securityProtocol: SecurityProtocol): Processor = {
+ securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
new Processor(id,
time,
config.socketRequestMaxBytes,
@@ -149,7 +164,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
securityProtocol,
config,
metrics,
- credentialProvider
+ credentialProvider,
+ memoryPool
)
}
@@ -378,7 +394,8 @@ private[kafka] class Processor(val id: Int,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
- credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+ credentialProvider: CredentialProvider,
+ memoryPool: MemoryPool) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -422,7 +439,8 @@ private[kafka] class Processor(val id: Int,
metricTags,
false,
true,
- ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
+ ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache),
+ memoryPool)
override def run() {
startupComplete()
@@ -517,7 +535,8 @@ private[kafka] class Processor(val id: Int,
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
buffer = receive.payload, startTimeNanos = time.nanoseconds,
- listenerName = listenerName, securityProtocol = securityProtocol)
+ listenerName = listenerName, securityProtocol = securityProtocol,
+ memoryPool = memoryPool)
requestChannel.sendRequest(req)
selector.mute(receive.source)
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3941e17..a900e6d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -52,6 +52,7 @@ object Defaults {
val NumIoThreads = 8
val BackgroundThreads = 10
val QueuedMaxRequests = 500
+ val QueuedMaxRequestBytes = -1
/************* Authorizer Configuration ***********/
val AuthorizerClassName = ""
@@ -236,6 +237,7 @@ object KafkaConfig {
val NumIoThreadsProp = "num.io.threads"
val BackgroundThreadsProp = "background.threads"
val QueuedMaxRequestsProp = "queued.max.requests"
+ val QueuedMaxBytesProp = "queued.max.request.bytes"
val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
@@ -420,6 +422,7 @@ object KafkaConfig {
val NumIoThreadsDoc = "The number of threads that the server uses for processing requests, which may include disk I/O"
val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks"
val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads"
+ val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests are read"
val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
/************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = "The authorizer class that should be used for authorization"
@@ -684,6 +687,7 @@ object KafkaConfig {
.define(NumIoThreadsProp, INT, Defaults.NumIoThreads, atLeast(1), HIGH, NumIoThreadsDoc)
.define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc)
.define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc)
+ .define(QueuedMaxBytesProp, LONG, Defaults.QueuedMaxRequestBytes, MEDIUM, QueuedMaxRequestBytesDoc)
.define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc)
/************* Authorizer Configuration ***********/
@@ -900,6 +904,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
+ val queuedMaxBytes = getLong(KafkaConfig.QueuedMaxBytesProp)
val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp)
@@ -1191,5 +1196,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
require(!interBrokerUsesSasl || saslEnabledMechanisms.contains(saslMechanismInterBrokerProtocol),
s"${KafkaConfig.SaslMechanismInterBrokerProtocolProp} must be included in ${KafkaConfig.SaslEnabledMechanismsProp} when SASL is used for inter-broker communication")
+ require(queuedMaxBytes <= 0 || queuedMaxBytes >= socketRequestMaxBytes,
+ s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index feb07b8..512be67 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -40,9 +40,9 @@ class KafkaRequestHandler(id: Int,
private val latch = new CountDownLatch(1)
def run() {
- while (true) {
+ while(true) {
+ var req : RequestChannel.Request = null
try {
- var req : RequestChannel.Request = null
while (req == null) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
@@ -69,6 +69,9 @@ class KafkaRequestHandler(id: Int,
latch.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
+ } finally {
+ if (req != null)
+ req.dispose()
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index acf96e8..ed35269 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -29,6 +29,7 @@ import kafka.security.CredentialProvider
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send}
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
@@ -328,9 +329,9 @@ class SocketServerTest extends JUnitSuite {
var conn: Socket = null
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
- protocol: SecurityProtocol): Processor = {
+ protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
- config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider) {
+ config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, MemoryPool.NONE) {
override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
conn.close()
super.sendResponse(response, responseSend)
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index ae1cfc0..38d4bb3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -35,6 +35,7 @@ import kafka.server._
import kafka.utils.{MockTime, TestUtils, ZkUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnsupportedVersionException
+import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
@@ -395,7 +396,7 @@ class KafkaApisTest {
val header = new RequestHeader(builder.apiKey.id, request.version, "", 0)
val buffer = request.serialize(header)
val session = Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost)
- (request, RequestChannel.Request(1, "1", session, buffer, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT))
+ (request, RequestChannel.Request(1, "1", session, buffer, MemoryPool.NONE, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT))
}
private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1b801de..dee6e87 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -545,6 +545,7 @@ class KafkaConfigTest {
case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+ case KafkaConfig.QueuedMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.AuthorizerClassNameProp => //ignore string
[2/2] kafka git commit: KAFKA-4602;
KIP-72 - Allow putting a bound on memory consumed by Incoming requests
Posted by ju...@apache.org.
KAFKA-4602; KIP-72 - Allow putting a bound on memory consumed by Incoming requests
this is the initial implementation.
Author: radai-rosenblatt <ra...@gmail.com>
Reviewers: Ewen Cheslack-Postava <me...@ewencp.org>, Ismael Juma <is...@juma.me.uk>, Rajini Sivaram <ra...@googlemail.com>, Jun Rao <ju...@gmail.com>
Closes #2330 from radai-rosenblatt/broker-memory-pool-with-muting
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/47ee8e95
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/47ee8e95
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/47ee8e95
Branch: refs/heads/trunk
Commit: 47ee8e954df62b9a79099e944ec4be29afe046f6
Parents: f15cdbc
Author: radai-rosenblatt <ra...@gmail.com>
Authored: Wed Jul 26 08:19:56 2017 +0200
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jul 26 08:19:56 2017 +0200
----------------------------------------------------------------------
checkstyle/import-control.xml | 5 +
.../memory/GarbageCollectedMemoryPool.java | 168 +++++++++++++++++++
.../apache/kafka/common/memory/MemoryPool.java | 95 +++++++++++
.../kafka/common/memory/SimpleMemoryPool.java | 137 +++++++++++++++
.../kafka/common/network/ChannelBuilder.java | 7 +-
.../kafka/common/network/KafkaChannel.java | 55 +++++-
.../kafka/common/network/NetworkReceive.java | 50 +++++-
.../common/network/PlaintextChannelBuilder.java | 6 +-
.../common/network/PlaintextTransportLayer.java | 5 +
.../apache/kafka/common/network/Receive.java | 12 +-
.../common/network/SaslChannelBuilder.java | 6 +-
.../apache/kafka/common/network/Selector.java | 131 +++++++++++++--
.../kafka/common/network/SslChannelBuilder.java | 6 +-
.../kafka/common/network/SslTransportLayer.java | 5 +
.../kafka/common/network/TransportLayer.java | 6 +-
.../kafka/common/protocol/ProtoUtils.java | 47 ++++++
.../apache/kafka/common/protocol/Protocol.java | 47 ++++++
.../kafka/common/protocol/SchemaVisitor.java | 27 +++
.../common/protocol/SchemaVisitorAdapter.java | 38 +++++
.../kafka/common/protocol/types/Field.java | 5 +
.../kafka/common/protocol/types/Schema.java | 4 +-
.../org/apache/kafka/common/utils/Utils.java | 33 +++-
.../memory/GarbageCollectedMemoryPoolTest.java | 163 ++++++++++++++++++
.../kafka/common/network/NioEchoServer.java | 5 +-
.../kafka/common/network/PlaintextSender.java | 44 +++++
.../kafka/common/network/SelectorTest.java | 92 +++++++++-
.../kafka/common/network/SslSelectorTest.java | 95 ++++++++++-
.../apache/kafka/common/network/SslSender.java | 83 +++++++++
.../common/network/SslTransportLayerTest.java | 3 +-
.../kafka/common/protocol/ProtoUtilsTest.java | 35 ++++
.../apache/kafka/common/utils/UtilsTest.java | 12 ++
.../scala/kafka/network/RequestChannel.scala | 25 ++-
.../main/scala/kafka/network/SocketServer.scala | 33 +++-
.../main/scala/kafka/server/KafkaConfig.scala | 7 +
.../kafka/server/KafkaRequestHandler.scala | 7 +-
.../unit/kafka/network/SocketServerTest.scala | 5 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../unit/kafka/server/KafkaConfigTest.scala | 1 +
38 files changed, 1446 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 1579a1c..4bd907b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -43,6 +43,7 @@
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.memory" />
<subpackage name="common">
<disallow pkg="org.apache.kafka.clients" />
@@ -67,6 +68,10 @@
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
+ <subpackage name="memory">
+ <allow pkg="org.apache.kafka.common.metrics" />
+ </subpackage>
+
<subpackage name="network">
<allow pkg="org.apache.kafka.common.security.auth" />
<allow pkg="org.apache.kafka.common.protocol" />
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
new file mode 100644
index 0000000..041d1c2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Utils;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * An extension of SimpleMemoryPool that tracks allocated buffers and logs an error when they "leak"
+ * (when they are garbage-collected without having been release()ed).
+ * THIS IMPLEMENTATION IS A DEVELOPMENT/DEBUGGING AID AND IS NOT MEANT PRO PRODUCTION USE.
+ */
+public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements AutoCloseable {
+
+ private final ReferenceQueue<ByteBuffer> garbageCollectedBuffers = new ReferenceQueue<>();
+ //serves 2 purposes - 1st it maintains the ref objects reachable (which is a requirement for them
+ //to ever be enqueued), 2nd keeps some (small) metadata for every buffer allocated
+ private final Map<BufferReference, BufferMetadata> buffersInFlight = new ConcurrentHashMap<>();
+ private final GarbageCollectionListener gcListener = new GarbageCollectionListener();
+ private final Thread gcListenerThread;
+ private volatile boolean alive = true;
+
+ public GarbageCollectedMemoryPool(long sizeBytes, int maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) {
+ super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor);
+ this.alive = true;
+ this.gcListenerThread = new Thread(gcListener, "memory pool GC listener");
+ this.gcListenerThread.setDaemon(true); //so we dont need to worry about shutdown
+ this.gcListenerThread.start();
+ }
+
+ @Override
+ protected void bufferToBeReturned(ByteBuffer justAllocated) {
+ BufferReference ref = new BufferReference(justAllocated, garbageCollectedBuffers);
+ BufferMetadata metadata = new BufferMetadata(justAllocated.capacity());
+ if (buffersInFlight.put(ref, metadata) != null)
+ //this is a bug. it means either 2 different co-existing buffers got
+ //the same identity or we failed to register a released/GC'ed buffer
+ throw new IllegalStateException("allocated buffer identity " + ref.hashCode + " already registered as in use?!");
+
+ log.trace("allocated buffer of size {} and identity {}", sizeBytes, ref.hashCode);
+ }
+
+ @Override
+ protected void bufferToBeReleased(ByteBuffer justReleased) {
+ BufferReference ref = new BufferReference(justReleased); //used ro lookup only
+ BufferMetadata metadata = buffersInFlight.remove(ref);
+ if (metadata == null)
+ //its impossible for the buffer to have already been GC'ed (because we have a hard ref to it
+ //in the function arg) so this means either a double free or not our buffer.
+ throw new IllegalArgumentException("returned buffer " + ref.hashCode + " was never allocated by this pool");
+ if (metadata.sizeBytes != justReleased.capacity()) {
+ //this is a bug
+ throw new IllegalStateException("buffer " + ref.hashCode + " has capacity " + justReleased.capacity() + " but recorded as " + metadata.sizeBytes);
+ }
+ log.trace("released buffer of size {} and identity {}", metadata.sizeBytes, ref.hashCode);
+ }
+
+ @Override
+ public void close() throws Exception {
+ alive = false;
+ gcListenerThread.interrupt();
+ }
+
+ private class GarbageCollectionListener implements Runnable {
+ @Override
+ public void run() {
+ while (alive) {
+ try {
+ BufferReference ref = (BufferReference) garbageCollectedBuffers.remove(); //blocks
+ ref.clear();
+ //this cannot race with a release() call because an object is either reachable or not,
+ //release() can only happen before its GC'ed, and enqueue can only happen after.
+ //if the ref was enqueued it must then not have been released
+ BufferMetadata metadata = buffersInFlight.remove(ref);
+
+ if (metadata == null) {
+ //it can happen rarely that the buffer was release()ed properly (so no metadata) and yet
+ //the reference object to it remains reachable for a short period of time after release()
+ //and hence gets enqueued. this is because we keep refs in a ConcurrentHashMap which cleans
+ //up keys lazily.
+ continue;
+ }
+
+ availableMemory.addAndGet(metadata.sizeBytes);
+ log.error("Reclaimed buffer of size {} and identity {} that was not properly release()ed. This is a bug.", metadata.sizeBytes, ref.hashCode);
+ } catch (InterruptedException e) {
+ log.debug("interrupted", e);
+ //ignore, we're a daemon thread
+ }
+ }
+ log.info("GC listener shutting down");
+ }
+ }
+
+ private static final class BufferMetadata {
+ private final int sizeBytes;
+
+ private BufferMetadata(int sizeBytes) {
+ this.sizeBytes = sizeBytes;
+ }
+ }
+
+ private static final class BufferReference extends WeakReference<ByteBuffer> {
+ private final int hashCode;
+
+ private BufferReference(ByteBuffer referent) { //used for lookup purposes only - no queue required.
+ this(referent, null);
+ }
+
+ private BufferReference(ByteBuffer referent, ReferenceQueue<? super ByteBuffer> q) {
+ super(referent, q);
+ hashCode = System.identityHashCode(referent);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) { //this is important to find leaked buffers (by ref identity)
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BufferReference that = (BufferReference) o;
+ if (hashCode != that.hashCode) {
+ return false;
+ }
+ ByteBuffer thisBuf = get();
+ if (thisBuf == null) {
+ //our buffer has already been GC'ed, yet "that" is not us. so not same buffer
+ return false;
+ }
+ ByteBuffer thatBuf = that.get();
+ return thisBuf == thatBuf;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+ }
+
+ @Override
+ public String toString() {
+ long allocated = sizeBytes - availableMemory.get();
+ return "GarbageCollectedMemoryPool{" + Utils.formatBytes(allocated) + "/" + Utils.formatBytes(sizeBytes) + " used in " + buffersInFlight.size() + " buffers}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java
new file mode 100644
index 0000000..5887816
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * A common memory pool interface for non-blocking pools.
+ * Every buffer returned from {@link #tryAllocate(int)} must always be {@link #release(ByteBuffer) released}.
+ */
+public interface MemoryPool {
+ MemoryPool NONE = new MemoryPool() {
+ @Override
+ public ByteBuffer tryAllocate(int sizeBytes) {
+ return ByteBuffer.allocate(sizeBytes);
+ }
+
+ @Override
+ public void release(ByteBuffer previouslyAllocated) {
+ //nop
+ }
+
+ @Override
+ public long size() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public long availableMemory() {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public boolean isOutOfMemory() {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "NONE";
+ }
+ };
+
+ /**
+ * Tries to acquire a ByteBuffer of the specified size
+ * @param sizeBytes size required
+ * @return a ByteBuffer (which later needs to be release()ed), or null if no memory available.
+ * the buffer will be of the exact size requested, even if backed by a larger chunk of memory
+ */
+ ByteBuffer tryAllocate(int sizeBytes);
+
+ /**
+ * Returns a previously allocated buffer to the pool.
+ * @param previouslyAllocated a buffer previously returned from tryAllocate()
+ */
+ void release(ByteBuffer previouslyAllocated);
+
+ /**
+ * Returns the total size of this pool
+ * @return total size, in bytes
+ */
+ long size();
+
+ /**
+ * Returns the amount of memory available for allocation by this pool.
+ * NOTE: result may be negative (pools may over allocate to avoid starvation issues)
+ * @return bytes available
+ */
+ long availableMemory();
+
+ /**
+ * Returns true if the pool cannot currently allocate any more buffers
+ * - meaning total outstanding buffers meets or exceeds pool size and
+ * some would need to be released before further allocations are possible.
+ *
+ * This is equivalent to availableMemory() <= 0
+ * @return true if out of memory
+ */
+ boolean isOutOfMemory();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java
new file mode 100644
index 0000000..f1ab8f7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * a simple pool implementation. this implementation just provides a limit on the total outstanding memory.
+ * any buffer allocated must be release()ed always otherwise memory is not marked as reclaimed (and "leak"s)
+ */
+public class SimpleMemoryPool implements MemoryPool {
+ protected final Logger log = LoggerFactory.getLogger(getClass()); //subclass-friendly
+
+ protected final long sizeBytes;
+ protected final boolean strict;
+ protected final AtomicLong availableMemory;
+ protected final int maxSingleAllocationSize;
+ protected final AtomicLong startOfNoMemPeriod = new AtomicLong(); //nanoseconds
+ protected volatile Sensor oomTimeSensor;
+
+ public SimpleMemoryPool(long sizeInBytes, int maxSingleAllocationBytes, boolean strict, Sensor oomPeriodSensor) {
+ if (sizeInBytes <= 0 || maxSingleAllocationBytes <= 0 || maxSingleAllocationBytes > sizeInBytes)
+ throw new IllegalArgumentException("must provide a positive size and max single allocation size smaller than size."
+ + "provided " + sizeInBytes + " and " + maxSingleAllocationBytes + " respectively");
+ this.sizeBytes = sizeInBytes;
+ this.strict = strict;
+ this.availableMemory = new AtomicLong(sizeInBytes);
+ this.maxSingleAllocationSize = maxSingleAllocationBytes;
+ this.oomTimeSensor = oomPeriodSensor;
+ }
+
+ @Override
+ public ByteBuffer tryAllocate(int sizeBytes) {
+ if (sizeBytes < 1)
+ throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");
+ if (sizeBytes > maxSingleAllocationSize)
+ throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);
+
+ long available;
+ boolean success = false;
+ //in strict mode we will only allocate memory if we have at least the size required.
+ //in non-strict mode we will allocate memory if we have _any_ memory available (so available memory
+ //can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize)
+ long threshold = strict ? sizeBytes : 1;
+ while ((available = availableMemory.get()) >= threshold) {
+ success = availableMemory.compareAndSet(available, available - sizeBytes);
+ if (success)
+ break;
+ }
+
+ if (success) {
+ maybeRecordEndOfDrySpell();
+ } else {
+ if (oomTimeSensor != null) {
+ startOfNoMemPeriod.compareAndSet(0, System.nanoTime());
+ }
+ log.trace("refused to allocate buffer of size {}", sizeBytes);
+ return null;
+ }
+
+ ByteBuffer allocated = ByteBuffer.allocate(sizeBytes);
+ bufferToBeReturned(allocated);
+ return allocated;
+ }
+
+ @Override
+ public void release(ByteBuffer previouslyAllocated) {
+ if (previouslyAllocated == null)
+ throw new IllegalArgumentException("provided null buffer");
+
+ bufferToBeReleased(previouslyAllocated);
+ availableMemory.addAndGet(previouslyAllocated.capacity());
+ maybeRecordEndOfDrySpell();
+ }
+
+ @Override
+ public long size() {
+ return sizeBytes;
+ }
+
+ @Override
+ public long availableMemory() {
+ return availableMemory.get();
+ }
+
+ @Override
+ public boolean isOutOfMemory() {
+ return availableMemory.get() <= 0;
+ }
+
+ //allows subclasses to do their own bookkeeping (and validation) _before_ memory is returned to client code.
+ protected void bufferToBeReturned(ByteBuffer justAllocated) {
+ log.trace("allocated buffer of size {} ", justAllocated.capacity());
+ }
+
+ //allows subclasses to do their own bookkeeping (and validation) _before_ memory is marked as reclaimed.
+ protected void bufferToBeReleased(ByteBuffer justReleased) {
+ log.trace("released buffer of size {}", justReleased.capacity());
+ }
+
+ @Override
+ public String toString() {
+ long allocated = sizeBytes - availableMemory.get();
+ return "SimpleMemoryPool{" + Utils.formatBytes(allocated) + "/" + Utils.formatBytes(sizeBytes) + " used}";
+ }
+
+ protected void maybeRecordEndOfDrySpell() {
+ if (oomTimeSensor != null) {
+ long startOfDrySpell = startOfNoMemPeriod.getAndSet(0);
+ if (startOfDrySpell != 0) {
+ //how long were we refusing allocation requests for
+ oomTimeSensor.record((System.nanoTime() - startOfDrySpell) / 1000000.0); //fractional (double) millis
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
index 0e5ca78..6c8890a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -20,6 +20,8 @@ import java.util.Map;
import java.nio.channels.SelectionKey;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.memory.MemoryPool;
+
/**
* A ChannelBuilder interface to build Channel based on configs
@@ -36,10 +38,11 @@ public interface ChannelBuilder extends AutoCloseable {
* returns a Channel with TransportLayer and Authenticator configured.
* @param id channel id
* @param key SelectionKey
- * @param maxReceiveSize
+ * @param maxReceiveSize max size of a single receive buffer to allocate
+ * @param memoryPool memory pool from which to allocate buffers, or null for none
* @return KafkaChannel
*/
- KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException;
+ KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException;
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index b563c4a..1e76c43 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -25,6 +25,8 @@ import java.nio.channels.SelectionKey;
import java.security.Principal;
+import java.util.Objects;
+import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.utils.Utils;
public class KafkaChannel {
@@ -35,6 +37,7 @@ public class KafkaChannel {
// The values are read and reset after each response is sent.
private long networkThreadTimeNanos;
private final int maxReceiveSize;
+ private final MemoryPool memoryPool;
private NetworkReceive receive;
private Send send;
// Track connection and mute state of channels to enable outstanding requests on channels to be
@@ -43,12 +46,13 @@ public class KafkaChannel {
private boolean muted;
private ChannelState state;
- public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
+ public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) throws IOException {
this.id = id;
this.transportLayer = transportLayer;
this.authenticator = authenticator;
this.networkThreadTimeNanos = 0L;
this.maxReceiveSize = maxReceiveSize;
+ this.memoryPool = memoryPool;
this.disconnected = false;
this.muted = false;
this.state = ChannelState.NOT_CONNECTED;
@@ -56,7 +60,7 @@ public class KafkaChannel {
public void close() throws IOException {
this.disconnected = true;
- Utils.closeAll(transportLayer, authenticator);
+ Utils.closeAll(transportLayer, authenticator, receive);
}
/**
@@ -106,13 +110,16 @@ public class KafkaChannel {
return id;
}
- public void mute() {
+ /**
+ * externally muting a channel should be done via selector to ensure proper state handling
+ */
+ void mute() {
if (!disconnected)
transportLayer.removeInterestOps(SelectionKey.OP_READ);
muted = true;
}
- public void unmute() {
+ void unmute() {
if (!disconnected)
transportLayer.addInterestOps(SelectionKey.OP_READ);
muted = false;
@@ -125,6 +132,17 @@ public class KafkaChannel {
return muted;
}
+ public boolean isInMutableState() {
+ //some requests do not require memory, so if we do not know what the current (or future) request is
+ //(receive == null) we dont mute. we also dont mute if whatever memory required has already been
+ //successfully allocated (if none is required for the currently-being-read request
+ //receive.memoryAllocated() is expected to return true)
+ if (receive == null || receive.memoryAllocated())
+ return false;
+ //also cannot mute if underlying transport is not in the ready state
+ return transportLayer.ready();
+ }
+
public boolean ready() {
return transportLayer.ready() && authenticator.complete();
}
@@ -161,7 +179,7 @@ public class KafkaChannel {
NetworkReceive result = null;
if (receive == null) {
- receive = new NetworkReceive(maxReceiveSize, id);
+ receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
}
receive(receive);
@@ -169,6 +187,9 @@ public class KafkaChannel {
receive.payload().rewind();
result = receive;
receive = null;
+ } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
+ //pool must be out of memory, mute ourselves.
+ mute();
}
return result;
}
@@ -210,4 +231,28 @@ public class KafkaChannel {
return send.completed();
}
+
+ /**
+ * @return true if underlying transport has bytes remaining to be read from any underlying intermediate buffers.
+ */
+ public boolean hasBytesBuffered() {
+ return transportLayer.hasBytesBuffered();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KafkaChannel that = (KafkaChannel) o;
+ return Objects.equals(id, that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 582f064..564fbcd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -21,6 +21,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ScatteringByteChannel;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
@@ -29,10 +32,14 @@ public class NetworkReceive implements Receive {
public final static String UNKNOWN_SOURCE = "";
public final static int UNLIMITED = -1;
+ private static final Logger log = LoggerFactory.getLogger(NetworkReceive.class);
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
private final String source;
private final ByteBuffer size;
private final int maxSize;
+ private final MemoryPool memoryPool;
+ private int requestedBufferSize = -1;
private ByteBuffer buffer;
@@ -41,6 +48,7 @@ public class NetworkReceive implements Receive {
this.buffer = buffer;
this.size = null;
this.maxSize = UNLIMITED;
+ this.memoryPool = MemoryPool.NONE;
}
public NetworkReceive(String source) {
@@ -48,6 +56,7 @@ public class NetworkReceive implements Receive {
this.size = ByteBuffer.allocate(4);
this.buffer = null;
this.maxSize = UNLIMITED;
+ this.memoryPool = MemoryPool.NONE;
}
public NetworkReceive(int maxSize, String source) {
@@ -55,6 +64,15 @@ public class NetworkReceive implements Receive {
this.size = ByteBuffer.allocate(4);
this.buffer = null;
this.maxSize = maxSize;
+ this.memoryPool = MemoryPool.NONE;
+ }
+
+ public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
+ this.source = source;
+ this.size = ByteBuffer.allocate(4);
+ this.buffer = null;
+ this.maxSize = maxSize;
+ this.memoryPool = memoryPool;
}
public NetworkReceive() {
@@ -68,13 +86,32 @@ public class NetworkReceive implements Receive {
@Override
public boolean complete() {
- return !size.hasRemaining() && !buffer.hasRemaining();
+ return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}
public long readFrom(ScatteringByteChannel channel) throws IOException {
return readFromReadableChannel(channel);
}
+ @Override
+ public boolean requiredMemoryAmountKnown() {
+ return requestedBufferSize != -1;
+ }
+
+ @Override
+ public boolean memoryAllocated() {
+ return buffer != null;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ if (buffer != null && buffer != EMPTY_BUFFER) {
+ memoryPool.release(buffer);
+ buffer = null;
+ }
+ }
+
// Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
// See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
// This can go away after we get rid of BlockingChannel
@@ -93,10 +130,17 @@ public class NetworkReceive implements Receive {
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
-
- this.buffer = ByteBuffer.allocate(receiveSize);
+ requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
+ if (receiveSize == 0) {
+ buffer = EMPTY_BUFFER;
+ }
}
}
+ if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet
+ buffer = memoryPool.tryAllocate(requestedBufferSize);
+ if (buffer == null)
+ log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
+ }
if (buffer != null) {
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index fb4f6ba..ad63671 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.network;
import java.nio.channels.SelectionKey;
import java.util.Map;
+import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.KafkaException;
@@ -39,12 +40,13 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
}
}
- public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+ @Override
+ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
try {
PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
Authenticator authenticator = new DefaultAuthenticator();
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
- return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+ return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
} catch (Exception e) {
log.warn("Failed to create channel due to ", e);
throw new KafkaException(e);
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index 871b7ac..11c9565 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -214,6 +214,11 @@ public class PlaintextTransportLayer implements TransportLayer {
}
@Override
+ public boolean hasBytesBuffered() {
+ return false;
+ }
+
+ @Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Receive.java b/clients/src/main/java/org/apache/kafka/common/network/Receive.java
index b7bbdb4..3bc2761 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Receive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Receive.java
@@ -16,13 +16,14 @@
*/
package org.apache.kafka.common.network;
+import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.ScatteringByteChannel;
/**
* This interface models the in-progress reading of data from a channel to a source identified by an integer id
*/
-public interface Receive {
+public interface Receive extends Closeable {
/**
* The numeric id of the source from which we are receiving data.
@@ -42,4 +43,13 @@ public interface Receive {
*/
long readFrom(ScatteringByteChannel channel) throws IOException;
+ /**
+ * Do we know yet how much memory we require to fully read this
+ */
+ boolean requiredMemoryAmountKnown();
+
+ /**
+ * Has the underlying memory required to complete reading been allocated yet?
+ */
+ boolean memoryAllocated();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 33a9f4d..342592b 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.authenticator.CredentialCache;
@@ -100,7 +101,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
}
}
- public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+ @Override
+ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
try {
SocketChannel socketChannel = (SocketChannel) key.channel();
TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
@@ -114,7 +116,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable);
// Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes.
authenticator.configure(transportLayer, null, this.configs);
- return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+ return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
} catch (Exception e) {
log.info("Failed to create channel due to ", e);
throw new KafkaException(e);
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index da3de80..38226f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
@@ -38,6 +39,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
@@ -87,11 +89,14 @@ public class Selector implements Selectable, AutoCloseable {
private final java.nio.channels.Selector nioSelector;
private final Map<String, KafkaChannel> channels;
+ private final Set<KafkaChannel> explicitlyMutedChannels;
+ private boolean outOfMemory;
private final List<Send> completedSends;
private final List<NetworkReceive> completedReceives;
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
private final Map<String, KafkaChannel> closingChannels;
+ private Set<SelectionKey> keysWithBufferedRead;
private final Map<String, ChannelState> disconnected;
private final List<String> connected;
private final List<String> failedSends;
@@ -101,6 +106,11 @@ public class Selector implements Selectable, AutoCloseable {
private final int maxReceiveSize;
private final boolean recordTimePerConnection;
private final IdleExpiryManager idleExpiryManager;
+ private final MemoryPool memoryPool;
+ private final long lowMemThreshold;
+ //indicates if the previous call to poll was able to make progress in reading already-buffered data.
+ //this is used to prevent tight loops when memory is not available to read any more data
+ private boolean madeReadProgressLastPoll = true;
/**
* Create a new nioSelector
@@ -122,7 +132,8 @@ public class Selector implements Selectable, AutoCloseable {
Map<String, String> metricTags,
boolean metricsPerConnection,
boolean recordTimePerConnection,
- ChannelBuilder channelBuilder) {
+ ChannelBuilder channelBuilder,
+ MemoryPool memoryPool) {
try {
this.nioSelector = java.nio.channels.Selector.open();
} catch (IOException e) {
@@ -131,11 +142,14 @@ public class Selector implements Selectable, AutoCloseable {
this.maxReceiveSize = maxReceiveSize;
this.time = time;
this.channels = new HashMap<>();
+ this.explicitlyMutedChannels = new HashSet<>();
+ this.outOfMemory = false;
this.completedSends = new ArrayList<>();
this.completedReceives = new ArrayList<>();
this.stagedReceives = new HashMap<>();
this.immediatelyConnectedKeys = new HashSet<>();
this.closingChannels = new HashMap<>();
+ this.keysWithBufferedRead = new HashSet<>();
this.connected = new ArrayList<>();
this.disconnected = new HashMap<>();
this.failedSends = new ArrayList<>();
@@ -143,6 +157,8 @@ public class Selector implements Selectable, AutoCloseable {
this.channelBuilder = channelBuilder;
this.recordTimePerConnection = recordTimePerConnection;
this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
+ this.memoryPool = memoryPool;
+ this.lowMemThreshold = (long) (0.1 * this.memoryPool.size());
}
public Selector(int maxReceiveSize,
@@ -153,7 +169,7 @@ public class Selector implements Selectable, AutoCloseable {
Map<String, String> metricTags,
boolean metricsPerConnection,
ChannelBuilder channelBuilder) {
- this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder);
+ this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE);
}
public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
@@ -200,7 +216,7 @@ public class Selector implements Selectable, AutoCloseable {
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
KafkaChannel channel;
try {
- channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+ channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
} catch (Exception e) {
try {
socketChannel.close();
@@ -227,7 +243,7 @@ public class Selector implements Selectable, AutoCloseable {
*/
public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
- KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+ KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
key.attach(channel);
this.channels.put(id, channel);
}
@@ -311,20 +327,46 @@ public class Selector implements Selectable, AutoCloseable {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
+ boolean madeReadProgressLastCall = madeReadProgressLastPoll;
clear();
- if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
+ boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
+
+ if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
timeout = 0;
+ if (!memoryPool.isOutOfMemory() && outOfMemory) {
+ //we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
+ log.trace("Broker no longer low on memory - unmuting incoming sockets");
+ for (KafkaChannel channel : channels.values()) {
+ if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
+ channel.unmute();
+ }
+ }
+ outOfMemory = false;
+ }
+
/* check ready keys */
long startSelect = time.nanoseconds();
- int readyKeys = select(timeout);
+ int numReadyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
- if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
- pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
+ if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
+ Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
+ keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
+
+ //poll from channels that have buffered data (but nothing more from the underlying socket)
+ if (!keysWithBufferedRead.isEmpty()) {
+ Set<SelectionKey> toPoll = keysWithBufferedRead;
+ keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
+ pollSelectionKeys(toPoll, false, endSelect);
+ }
+ //poll from channels where the underlying socket has more data
+ pollSelectionKeys(readyKeys, false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
+ } else {
+ madeReadProgressLastPoll = true; //no work is also "progress"
}
long endIo = time.nanoseconds();
@@ -339,10 +381,16 @@ public class Selector implements Selectable, AutoCloseable {
addToCompletedReceives();
}
- private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
+ /**
+ * handle any ready I/O on a set of selection keys
+ * @param selectionKeys set of keys to handle
+ * @param isImmediatelyConnected true if running over a set of keys for just-connected sockets
+ * @param currentTimeNanos time at which set of keys was determined
+ */
+ private void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
- Iterator<SelectionKey> iterator = selectionKeys.iterator();
+ Iterator<SelectionKey> iterator = determineHandlingOrder(selectionKeys).iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
@@ -372,14 +420,18 @@ public class Selector implements Selectable, AutoCloseable {
}
/* if channel is not ready finish prepare */
- if (channel.isConnected() && !channel.ready())
+ if (channel.isConnected() && !channel.ready()) {
channel.prepare();
+ }
+
+ attemptRead(key, channel);
- /* if channel is ready read from any connections that have readable data */
- if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
- NetworkReceive networkReceive;
- while ((networkReceive = channel.read()) != null)
- addToStagedReceives(channel, networkReceive);
+ if (channel.hasBytesBuffered()) {
+ //this channel has bytes enqueued in intermediary buffers that we could not read
+ //(possibly because no memory). it may be the case that the underlying socket will
+ //not come up in the next poll() and so we need to remember this channel for the
+ //next poll call otherwise data may be stuck in said buffers forever.
+ keysWithBufferedRead.add(key);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
@@ -408,6 +460,39 @@ public class Selector implements Selectable, AutoCloseable {
}
}
+ private Collection<SelectionKey> determineHandlingOrder(Set<SelectionKey> selectionKeys) {
+ //it is possible that the iteration order over selectionKeys is the same every invocation.
+ //this may cause starvation of reads when memory is low. to address this we shuffle the keys if memory is low.
+ Collection<SelectionKey> inHandlingOrder;
+
+ if (!outOfMemory && memoryPool.availableMemory() < lowMemThreshold) {
+ List<SelectionKey> temp = new ArrayList<>(selectionKeys);
+ Collections.shuffle(temp);
+ inHandlingOrder = temp;
+ } else {
+ inHandlingOrder = selectionKeys;
+ }
+ return inHandlingOrder;
+ }
+
+ private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
+ //if channel is ready and has bytes to read from socket or buffer, and has no
+ //previous receive(s) already staged or otherwise in progress then read from it
+ if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
+ && !explicitlyMutedChannels.contains(channel)) {
+ NetworkReceive networkReceive;
+ while ((networkReceive = channel.read()) != null) {
+ madeReadProgressLastPoll = true;
+ addToStagedReceives(channel, networkReceive);
+ }
+ if (channel.isMute()) {
+ outOfMemory = true; //channel has muted itself due to memory pressure.
+ } else {
+ madeReadProgressLastPoll = true;
+ }
+ }
+ }
+
// Record time spent in pollSelectionKeys for channel (moved into a method to keep checkstyle happy)
private void maybeRecordTimePerConnection(KafkaChannel channel, long startTimeNanos) {
if (recordTimePerConnection)
@@ -442,6 +527,7 @@ public class Selector implements Selectable, AutoCloseable {
private void mute(KafkaChannel channel) {
channel.mute();
+ explicitlyMutedChannels.add(channel);
}
@Override
@@ -451,6 +537,7 @@ public class Selector implements Selectable, AutoCloseable {
}
private void unmute(KafkaChannel channel) {
+ explicitlyMutedChannels.remove(channel);
channel.unmute();
}
@@ -509,6 +596,7 @@ public class Selector implements Selectable, AutoCloseable {
this.disconnected.put(channel, ChannelState.FAILED_SEND);
}
this.failedSends.clear();
+ this.madeReadProgressLastPoll = false;
}
/**
@@ -674,7 +762,7 @@ public class Selector implements Selectable, AutoCloseable {
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
- if (!channel.isMute()) {
+ if (!explicitlyMutedChannels.contains(channel)) {
Deque<NetworkReceive> deque = entry.getValue();
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
@@ -900,4 +988,13 @@ public class Selector implements Selectable, AutoCloseable {
}
}
+ //package-private for testing
+ boolean isOutOfMemory() {
+ return outOfMemory;
+ }
+
+ //package-private for testing
+ boolean isMadeReadProgressLastPoll() {
+ return madeReadProgressLastPoll;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index bc34b70..01b72fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -22,6 +22,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Map;
+import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.KafkaException;
@@ -50,12 +51,13 @@ public class SslChannelBuilder implements ChannelBuilder {
}
}
- public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+ @Override
+ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
try {
SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key));
Authenticator authenticator = new DefaultAuthenticator();
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
- return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+ return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
} catch (Exception e) {
log.info("Failed to create channel due to ", e);
throw new KafkaException(e);
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index eb423e3..3cd0114 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -739,6 +739,11 @@ public class SslTransportLayer implements TransportLayer {
}
@Override
+ public boolean hasBytesBuffered() {
+ return netReadBuffer.position() != 0 || appReadBuffer.position() != 0;
+ }
+
+ @Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, this);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
index fad0cea..be56ad5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -85,6 +85,11 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
boolean isMute();
/**
+ * @return true if channel has bytes to be read in any intermediate buffers
+ */
+ boolean hasBytesBuffered();
+
+ /**
* Transfers bytes from `fileChannel` to this `TransportLayer`.
*
* This method will delegate to {@link FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)},
@@ -99,5 +104,4 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
* @see FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)
*/
long transferFrom(FileChannel fileChannel, long position, long count) throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
new file mode 100644
index 0000000..5d39dff
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Type;
+
+public class ProtoUtils {
+ public static void walk(Schema schema, SchemaVisitor visitor) {
+ if (schema == null || visitor == null) {
+ throw new IllegalArgumentException("Both schema and visitor must be provided");
+ }
+ handleNode(schema, visitor);
+ }
+
+ private static void handleNode(Type node, SchemaVisitor visitor) {
+ if (node instanceof Schema) {
+ Schema schema = (Schema) node;
+ visitor.visit(schema);
+ for (Field f : schema.fields()) {
+ handleNode(f.type, visitor);
+ }
+ } else if (node instanceof ArrayOf) {
+ ArrayOf array = (ArrayOf) node;
+ visitor.visit(array);
+ handleNode(array.type(), visitor);
+ } else {
+ visitor.visit(node);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 329d99b..ee40133 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -21,10 +21,13 @@ import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;
+import java.util.EnumSet;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
import static org.apache.kafka.common.protocol.types.Type.BYTES;
@@ -32,6 +35,7 @@ import static org.apache.kafka.common.protocol.types.Type.INT16;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import static org.apache.kafka.common.protocol.types.Type.INT64;
import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
import static org.apache.kafka.common.protocol.types.Type.RECORDS;
import static org.apache.kafka.common.protocol.types.Type.STRING;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
@@ -1825,6 +1829,7 @@ public class Protocol {
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
static final short[] MIN_VERSIONS = new short[ApiKeys.MAX_API_KEY + 1];
+ static final EnumSet<ApiKeys> DELAYED_DEALLOCATION_REQUESTS; //initialized in static block
/* the latest version of each api */
static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
@@ -1924,6 +1929,44 @@ public class Protocol {
throw new IllegalStateException("Request and response for version " + i + " of API "
+ api.id + " are defined inconsistently. One is null while the other is not null.");
}
+
+ /* go over all request schemata and find those that retain links to the underlying ByteBuffer from
+ * which they were read out. knowing which requests do (and do not) retain a reference to the buffer
+ * is needed to enable buffers to be released as soon as possible for requests that no longer need them */
+ Set<ApiKeys> requestsWithBufferRefs = new HashSet<>();
+ for (int reqId = 0; reqId < REQUESTS.length; reqId++) {
+ ApiKeys requestType = ApiKeys.forId(reqId);
+ Schema[] schemata = REQUESTS[reqId];
+ if (schemata == null) {
+ continue;
+ }
+ for (Schema requestVersionSchema : schemata) {
+ if (retainsBufferReference(requestVersionSchema)) {
+ requestsWithBufferRefs.add(requestType);
+ break; //kafka is loose with versions, so if _ANY_ version retains buffers we must assume all do.
+ }
+ }
+ }
+
+ DELAYED_DEALLOCATION_REQUESTS = EnumSet.copyOf(requestsWithBufferRefs);
+ }
+
+ private static boolean retainsBufferReference(Schema schema) {
+ if (schema == null) {
+ return false;
+ }
+ final AtomicReference<Boolean> foundBufferReference = new AtomicReference<>(Boolean.FALSE);
+ SchemaVisitor detector = new SchemaVisitorAdapter() {
+ @Override
+ public void visit(Type field) {
+ if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS) {
+ foundBufferReference.set(Boolean.TRUE);
+ }
+ }
+ };
+ foundBufferReference.set(Boolean.FALSE);
+ ProtoUtils.walk(schema, detector);
+ return foundBufferReference.get();
}
public static boolean apiVersionSupported(short apiKey, short apiVersion) {
@@ -1936,6 +1979,10 @@ public class Protocol {
0);
}
+ public static boolean requiresDelayedDeallocation(int apiKey) {
+ return DELAYED_DEALLOCATION_REQUESTS.contains(ApiKeys.forId(apiKey));
+ }
+
private static String indentString(int size) {
StringBuilder b = new StringBuilder(size);
for (int i = 0; i < size; i++)
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
new file mode 100644
index 0000000..e61cc77
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Type;
+
+public interface SchemaVisitor {
+ void visit(Schema schema);
+ void visit(ArrayOf array);
+ void visit(Type field);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
new file mode 100644
index 0000000..62834d0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Type;
+
+public abstract class SchemaVisitorAdapter implements SchemaVisitor {
+ @Override
+ public void visit(Schema schema) {
+ //nop
+ }
+
+ @Override
+ public void visit(ArrayOf array) {
+ //nop
+ }
+
+ @Override
+ public void visit(Type field) {
+ //nop
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
index 605174d..29a89d4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
@@ -70,4 +70,9 @@ public class Field {
return schema;
}
+
+ @Override
+ public String toString() {
+ return name + ":" + type;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index fbb520c..a9c08aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -139,9 +139,7 @@ public class Schema extends Type {
StringBuilder b = new StringBuilder();
b.append('{');
for (int i = 0; i < this.fields.length; i++) {
- b.append(this.fields[i].name);
- b.append(':');
- b.append(this.fields[i].type());
+ b.append(this.fields[i].toString());
if (i < this.fields.length - 1)
b.append(',');
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index e997fef..75f8cf7 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;
+import java.text.DecimalFormat;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +63,11 @@ public class Utils {
// IPv6 is supported with [ip] pattern
private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%._:]*)\\]?:([0-9]+)");
+ // Prints up to 2 decimal digits. Used for human readable printing
+ private static final DecimalFormat TWO_DIGIT_FORMAT = new DecimalFormat("0.##");
+
+ private static final String[] BYTE_SCALE_SUFFIXES = new String[] {"B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"};
+
public static final String NL = System.getProperty("line.separator");
private static final Logger log = LoggerFactory.getLogger(Utils.class);
@@ -379,6 +385,28 @@ public class Utils {
}
/**
+ * Formats a byte number as a human readable String ("3.2 MB")
+ * @param bytes some size in bytes
+ * @return
+ */
+ public static String formatBytes(long bytes) {
+ if (bytes < 0) {
+ return "" + bytes;
+ }
+ double asDouble = (double) bytes;
+ int ordinal = (int) Math.floor(Math.log(asDouble) / Math.log(1024.0));
+ double scale = Math.pow(1024.0, ordinal);
+ double scaled = asDouble / scale;
+ String formatted = TWO_DIGIT_FORMAT.format(scaled);
+ try {
+ return formatted + " " + BYTE_SCALE_SUFFIXES[ordinal];
+ } catch (IndexOutOfBoundsException e) {
+ //huge number?
+ return "" + asDouble;
+ }
+ }
+
+ /**
* Create a string representation of an array joined by the given separator
* @param strs The array of items
* @param separator The separator
@@ -613,7 +641,7 @@ public class Utils {
} catch (IOException outer) {
try {
Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
- log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target,
+ log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target,
outer.getMessage());
} catch (IOException inner) {
inner.addSuppressed(outer);
@@ -632,7 +660,8 @@ public class Utils {
IOException exception = null;
for (Closeable closeable : closeables) {
try {
- closeable.close();
+ if (closeable != null)
+ closeable.close();
} catch (IOException e) {
if (exception != null)
exception.addSuppressed(e);
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java b/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java
new file mode 100644
index 0000000..788d447
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class GarbageCollectedMemoryPoolTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testZeroSize() throws Exception {
+ new GarbageCollectedMemoryPool(0, 7, true, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNegativeSize() throws Exception {
+ new GarbageCollectedMemoryPool(-1, 7, false, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testZeroMaxAllocation() throws Exception {
+ new GarbageCollectedMemoryPool(100, 0, true, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNegativeMaxAllocation() throws Exception {
+ new GarbageCollectedMemoryPool(100, -1, false, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMaxAllocationLargerThanSize() throws Exception {
+ new GarbageCollectedMemoryPool(100, 101, true, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAllocationOverMaxAllocation() throws Exception {
+ GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
+ pool.tryAllocate(11);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAllocationZero() throws Exception {
+ GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
+ pool.tryAllocate(0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testAllocationNegative() throws Exception {
+ GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
+ pool.tryAllocate(-1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testReleaseNull() throws Exception {
+ GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
+ pool.release(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testReleaseForeignBuffer() throws Exception {
+ GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
+ ByteBuffer fellOffATruck = ByteBuffer.allocate(1);
+ pool.release(fellOffATruck);
+ }
+
+ @Test
+ public void testDoubleFree() throws Exception {
+ GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
+ ByteBuffer buffer = pool.tryAllocate(5);
+ Assert.assertNotNull(buffer);
+ pool.release(buffer); //so far so good
+ try {
+ pool.release(buffer);
+ Assert.fail("2nd release() should have failed");
+ } catch (IllegalArgumentException e) {
+ //as expected
+ } catch (Throwable t) {
+ Assert.fail("expected an IllegalArgumentException. instead got " + t);
+ }
+ }
+
+ @Test
+ public void testAllocationBound() throws Exception {
+ GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(21, 10, false, null);
+ ByteBuffer buf1 = pool.tryAllocate(10);
+ Assert.assertNotNull(buf1);
+ Assert.assertEquals(10, buf1.capacity());
+ ByteBuffer buf2 = pool.tryAllocate(10);
+ Assert.assertNotNull(buf2);
+ Assert.assertEquals(10, buf2.capacity());
+ ByteBuffer buf3 = pool.tryAllocate(10);
+ Assert.assertNotNull(buf3);
+ Assert.assertEquals(10, buf3.capacity());
+ //no more allocations
+ Assert.assertNull(pool.tryAllocate(1));
+ //release a buffer
+ pool.release(buf3);
+ //now we can have more
+ ByteBuffer buf4 = pool.tryAllocate(10);
+ Assert.assertNotNull(buf4);
+ Assert.assertEquals(10, buf4.capacity());
+ //no more allocations
+ Assert.assertNull(pool.tryAllocate(1));
+ }
+
+ @Test
+ public void testBuffersGarbageCollected() throws Exception {
+ Runtime runtime = Runtime.getRuntime();
+ long maxHeap = runtime.maxMemory(); //in bytes
+ long maxPool = maxHeap / 2;
+ long maxSingleAllocation = maxPool / 10;
+ Assert.assertTrue(maxSingleAllocation < Integer.MAX_VALUE / 2); //test JVM running with too much memory for this test logic (?)
+ GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(maxPool, (int) maxSingleAllocation, false, null);
+
+ //we will allocate 30 buffers from this pool, which is sized such that at-most
+ //11 should coexist and 30 do not fit in the JVM memory, proving that:
+ // 1. buffers were reclaimed and
+ // 2. the pool registered the reclamation.
+
+ int timeoutSeconds = 30;
+ long giveUp = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds);
+ boolean success = false;
+
+ int buffersAllocated = 0;
+ while (System.currentTimeMillis() < giveUp) {
+ ByteBuffer buffer = pool.tryAllocate((int) maxSingleAllocation);
+ if (buffer == null) {
+ System.gc();
+ Thread.sleep(10);
+ continue;
+ }
+ buffersAllocated++;
+ if (buffersAllocated >= 30) {
+ success = true;
+ break;
+ }
+ }
+
+ Assert.assertTrue("failed to allocate 30 buffers in " + timeoutSeconds + " seconds."
+ + " buffers allocated: " + buffersAllocated + " heap " + Utils.formatBytes(maxHeap)
+ + " pool " + Utils.formatBytes(maxPool) + " single allocation "
+ + Utils.formatBytes(maxSingleAllocation), success);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 85c5002..fd4ef69 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -94,14 +94,15 @@ public class NioEchoServer extends Thread {
List<NetworkReceive> completedReceives = selector.completedReceives();
for (NetworkReceive rcv : completedReceives) {
KafkaChannel channel = channel(rcv.source());
- channel.mute();
+ String channelId = channel.id();
+ selector.mute(channelId);
NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
if (outputChannel == null)
selector.send(send);
else {
for (ByteBuffer buffer : send.buffers)
outputChannel.write(buffer);
- channel.unmute();
+ selector.unmute(channelId);
}
}
for (Send send : selector.completedSends())
http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
new file mode 100644
index 0000000..3338d03
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+/**
+ * test helper class that will connect to a given server address, write out the given payload and disconnect
+ */
+public class PlaintextSender extends Thread {
+
+ public PlaintextSender(final InetSocketAddress serverAddress, final byte[] payload) {
+ super(new Runnable() {
+ @Override
+ public void run() {
+ try (Socket connection = new Socket(serverAddress.getAddress(), serverAddress.getPort());
+ OutputStream os = connection.getOutputStream()) {
+ os.write(payload);
+ os.flush();
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ }
+ }
+ });
+ setDaemon(true);
+ setName("PlaintextSender - " + payload.length + " bytes @ " + serverAddress);
+ }
+}