You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/11/09 16:06:24 UTC

kafka git commit: KAFKA-6185; Remove channels from explictlyMutedChannels set when closed

Repository: kafka
Updated Branches:
  refs/heads/trunk fe3171844 -> 0653a895f


KAFKA-6185; Remove channels from explictlyMutedChannels set when closed

This memory leak could eventually lead to an OutOfMemoryError. This
was particularly likely in case of down conversions as the leaked
channels would hold on to the record batch (which is only loaded
into the heap in case of down conversions).

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #4193 from rajinisivaram/KAFKA-6185-oom


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0653a895
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0653a895
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0653a895

Branch: refs/heads/trunk
Commit: 0653a895f51492a8687aaba0dc7a43b21fca2a56
Parents: fe31718
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu Nov 9 16:02:04 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Nov 9 16:03:24 2017 +0000

----------------------------------------------------------------------
 .../apache/kafka/common/network/Selector.java   |  1 +
 .../kafka/common/network/SelectorTest.java      | 28 ++++++--
 .../unit/kafka/server/BaseRequestTest.scala     | 28 ++++++--
 .../unit/kafka/server/FetchRequestTest.scala    | 71 +++++++++++++++++++-
 .../server/SaslApiVersionsRequestTest.scala     |  4 +-
 5 files changed, 117 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0653a895/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index f70afe0..09df7b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -720,6 +720,7 @@ public class Selector implements Selectable, AutoCloseable {
         }
         this.sensors.connectionClosed.record();
         this.stagedReceives.remove(channel);
+        this.explicitlyMutedChannels.remove(channel);
         if (notifyDisconnect)
             this.disconnected.put(channel.id(), channel.state());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0653a895/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index e3d1831..36b847b 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.nio.ByteBuffer;
@@ -85,9 +86,13 @@ public class SelectorTest {
 
     @After
     public void tearDown() throws Exception {
-        this.selector.close();
-        this.server.close();
-        this.metrics.close();
+        try {
+            verifySelectorEmpty();
+        } finally {
+            this.selector.close();
+            this.server.close();
+            this.metrics.close();
+        }
     }
 
     public SecurityProtocol securityProtocol() {
@@ -527,7 +532,6 @@ public class SelectorTest {
         control.verify();
     }
 
-
     private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));
         selector.poll(1000L);
@@ -585,5 +589,21 @@ public class SelectorTest {
         }
     }
 
+    private void verifySelectorEmpty() throws Exception {
+        for (KafkaChannel channel : selector.channels())
+            selector.close(channel.id());
+        selector.poll(0);
+        selector.poll(0); // Poll a second time to clear everything
+        for (Field field : selector.getClass().getDeclaredFields()) {
+            field.setAccessible(true);
+            Object obj = field.get(selector);
+            if (obj instanceof Set)
+                assertTrue("Field not empty: " + field + " " + obj, ((Set<?>) obj).isEmpty());
+            else if (obj instanceof Map)
+                assertTrue("Field not empty: " + field + " " + obj, ((Map<?, ?>) obj).isEmpty());
+            else if (obj instanceof List)
+                assertTrue("Field not empty: " + field + " " + obj, ((List<?>) obj).isEmpty());
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0653a895/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index b34b5a0..f91afd4 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -110,7 +110,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
                      apiVersion: Option[Short] = None,
                      protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): ByteBuffer = {
     val socket = connect(destination, protocol)
-    try send(request, apiKey, socket, apiVersion)
+    try sendAndReceive(request, apiKey, socket, apiVersion)
     finally socket.close()
   }
 
@@ -123,18 +123,34 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
                            destination: SocketServer = anySocketServer,
                            protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): ByteBuffer = {
     val socket = connect(destination, protocol)
-    try sendStruct(requestStruct, apiKey, socket, apiVersion)
+    try sendStructAndReceive(requestStruct, apiKey, socket, apiVersion)
     finally socket.close()
   }
 
   /**
     * Serializes and sends the request to the given api.
-    * A ByteBuffer containing the response is returned.
     */
-  def send(request: AbstractRequest, apiKey: ApiKeys, socket: Socket, apiVersion: Option[Short] = None): ByteBuffer = {
+  def send(request: AbstractRequest, apiKey: ApiKeys, socket: Socket, apiVersion: Option[Short] = None): Unit = {
     val header = nextRequestHeader(apiKey, apiVersion.getOrElse(request.version))
     val serializedBytes = request.serialize(header).array
-    val response = requestAndReceive(socket, serializedBytes)
+    sendRequest(socket, serializedBytes)
+  }
+
+  /**
+   * Receive response and return a ByteBuffer containing response without the header
+   */
+  def receive(socket: Socket): ByteBuffer = {
+    val response = receiveResponse(socket)
+    skipResponseHeader(response)
+  }
+
+  /**
+    * Serializes and sends the request to the given api.
+    * A ByteBuffer containing the response is returned.
+    */
+  def sendAndReceive(request: AbstractRequest, apiKey: ApiKeys, socket: Socket, apiVersion: Option[Short] = None): ByteBuffer = {
+    send(request, apiKey, socket, apiVersion)
+    val response = receiveResponse(socket)
     skipResponseHeader(response)
   }
 
@@ -142,7 +158,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
     * Serializes and sends the requestStruct to the given api.
     * A ByteBuffer containing the response (without the response header) is returned.
     */
-  def sendStruct(requestStruct: Struct, apiKey: ApiKeys, socket: Socket, apiVersion: Short): ByteBuffer = {
+  def sendStructAndReceive(requestStruct: Struct, apiKey: ApiKeys, socket: Socket, apiVersion: Short): ByteBuffer = {
     val header = nextRequestHeader(apiKey, apiVersion)
     val serializedBytes = AbstractRequestResponse.serialize(header.toStruct, requestStruct).array
     val response = requestAndReceive(socket, serializedBytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0653a895/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 329772b..726757d 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -16,6 +16,7 @@
   */
 package kafka.server
 
+import java.io.DataInputStream
 import java.util
 import java.util.Properties
 
@@ -23,12 +24,12 @@ import kafka.api.KAFKA_0_11_0_IV2
 import kafka.log.LogConfig
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{Record, RecordBatch}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
-import org.apache.kafka.common.serialization.StringSerializer
+import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -44,7 +45,8 @@ class FetchRequestTest extends BaseRequestTest {
   private var producer: KafkaProducer[String, String] = null
 
   override def tearDown() {
-    producer.close()
+    if (producer != null)
+      producer.close()
     super.tearDown()
   }
 
@@ -169,6 +171,69 @@ class FetchRequestTest extends BaseRequestTest {
   }
 
   /**
+   * Tests that down-conversions dont leak memory. Large down conversions are triggered
+   * in the server. The client closes its connection after reading partial data when the
+   * channel is muted in the server. If buffers are not released this will result in OOM.
+   */
+  @Test
+  def testDownConversionWithConnectionFailure(): Unit = {
+    val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head
+
+    val msgValueLen = 100 * 1000
+    val batchSize = 4 * msgValueLen
+    val propsOverride = new Properties
+    propsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
+    val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 5, lingerMs = Long.MaxValue,
+      keySerializer = new StringSerializer, valueSerializer = new ByteArraySerializer, props = Some(propsOverride))
+    val bytes = new Array[Byte](msgValueLen)
+    val futures = try {
+      (0 to 1000).map { _ =>
+        producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, "key", bytes))
+      }
+    } finally {
+      producer.close()
+    }
+    // Check futures to ensure sends succeeded, but do this after close since the last
+    // batch is not complete, but sent when the producer is closed
+    futures.foreach(_.get)
+
+    def fetch(version: Short, maxPartitionBytes: Int, closeAfterPartialResponse: Boolean): Option[FetchResponse] = {
+      val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes,
+        Seq(topicPartition))).build(version)
+
+      val socket = connect(brokerSocketServer(leaderId))
+      try {
+        send(fetchRequest, ApiKeys.FETCH, socket)
+        if (closeAfterPartialResponse) {
+          // read some data to ensure broker has muted this channel and then close socket
+          val size = new DataInputStream(socket.getInputStream).readInt()
+          // Check that we have received almost `maxPartitionBytes` (minus a tolerance) since in
+          // the case of OOM, the size will be significantly smaller. We can't check for exactly
+          // maxPartitionBytes since we use approx message sizes that include only the message value.
+          assertTrue(s"Fetch size too small $size, broker may have run out of memory",
+              size > maxPartitionBytes - batchSize)
+          None
+        } else {
+          Some(FetchResponse.parse(receive(socket), version))
+        }
+      } finally {
+        socket.close()
+      }
+    }
+
+    val version = 1.toShort
+    (0 to 15).foreach(_ => fetch(version, maxPartitionBytes = msgValueLen * 1000, closeAfterPartialResponse = true))
+
+    val response = fetch(version, maxPartitionBytes = batchSize, closeAfterPartialResponse = false)
+    val fetchResponse = response.getOrElse(throw new IllegalStateException("No fetch response"))
+    val partitionData = fetchResponse.responseData.get(topicPartition)
+    assertEquals(Errors.NONE, partitionData.error)
+    val batches = partitionData.records.batches.asScala.toBuffer
+    assertEquals(3, batches.size) // size is 3 (not 4) since maxPartitionBytes=msgValueSize*4, excluding key and headers
+  }
+
+  /**
     * Ensure that we respect the fetch offset when returning records that were converted from an uncompressed v2
     * record batch to multiple v0/v1 record batches with size 1. If the fetch offset points to inside the record batch,
     * some records have to be dropped during the conversion.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0653a895/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 7703993..00b9934 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -90,13 +90,13 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslSetup {
 
   private def sendApiVersionsRequest(socket: Socket, request: ApiVersionsRequest,
                                      apiVersion: Option[Short] = None): ApiVersionsResponse = {
-    val response = send(request, ApiKeys.API_VERSIONS, socket, apiVersion)
+    val response = sendAndReceive(request, ApiKeys.API_VERSIONS, socket, apiVersion)
     ApiVersionsResponse.parse(response, request.version)
   }
 
   private def sendSaslHandshakeRequestValidateResponse(socket: Socket) {
     val request = new SaslHandshakeRequest("PLAIN")
-    val response = send(request, ApiKeys.SASL_HANDSHAKE, socket)
+    val response = sendAndReceive(request, ApiKeys.SASL_HANDSHAKE, socket)
     val handshakeResponse = SaslHandshakeResponse.parse(response, request.version)
     assertEquals(Errors.NONE, handshakeResponse.error)
     assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms)