You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/03/16 16:14:03 UTC

kafka git commit: KAFKA-4901; Make ProduceRequest thread-safe

Repository: kafka
Updated Branches:
  refs/heads/trunk be1127281 -> 1659ca177


KAFKA-4901; Make ProduceRequest thread-safe

If request logging is enabled, `ProduceRequest` can be accessed
and mutated concurrently from a network thread (which calls
`toString`) and a request handler thread (which calls
`clearPartitionRecords()`).

That can lead to a `ConcurrentModificationException` when iterating
the `partitionRecords` map.

The underlying thread-safety issue has existed since the server
started using the Java implementation of ProduceRequest in 0.10.0.
However, we were incorrectly not clearing the underlying struct until
0.10.2, so `toString` itself was thread-safe until that change. In 0.10.2,
`toString` is no longer thread-safe and we could potentially see a
`NullPointerException` given the right set of interleavings between
`toString` and `clearPartitionRecords` although we haven't seen that
happen yet.

In trunk, we changed the requests to have a `toStruct` method
instead of creating a struct in the constructor and `toString` was
no longer printing the contents of the `Struct`. This accidentally
fixed the race condition, but it meant that request logging was less
useful.

A couple of days ago, `AbstractRequest.toString` was changed to
print the contents of the request by calling `toStruct().toString()`
and reintroduced the race condition. The impact is more visible
because we iterate over a `HashMap`, which proactively
checks for concurrent modification (unlike arrays).

We will need a separate PR for 0.10.2.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jiangjie Qin <be...@gmail.com>, Onur Karaman <ok...@linkedin.com>, Jun Rao <ju...@gmail.com>

Closes #2689 from ijuma/produce-request-thread-safety


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1659ca17
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1659ca17
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1659ca17

Branch: refs/heads/trunk
Commit: 1659ca1773596b5889fe8b2f163196f7b32ea70a
Parents: be11272
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Mar 16 09:13:58 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Mar 16 09:13:58 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/requests/AbstractRequest.java  |  8 ++-
 .../kafka/common/requests/ProduceRequest.java   | 56 ++++++++++++++++++--
 .../common/requests/RequestResponseTest.java    | 50 +++++++++++++++++
 .../scala/kafka/network/RequestChannel.scala    |  2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 14 ++---
 5 files changed, 115 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1659ca17/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 3d79f7e..7cfc54f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -79,11 +79,15 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
 
     protected abstract Struct toStruct();
 
-    @Override
-    public String toString() {
+    public String toString(boolean verbose) {
         return toStruct().toString();
     }
 
+    @Override
+    public final String toString() {
+        return toString(true);
+    }
+
     /**
      * Get an error response for a request
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/1659ca17/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 5e4b5fb..df3b4fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -78,13 +79,27 @@ public class ProduceRequest extends AbstractRequest {
 
     private final short acks;
     private final int timeout;
-    private final Map<TopicPartition, MemoryRecords> partitionRecords;
+
+    private final Map<TopicPartition, Integer> partitionSizes;
+
+    // This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention when a produce request is
+    // put in the purgatory (due to client throttling, it can take a while before the response is sent).
+    // Care should be taken in methods that use this field.
+    private volatile Map<TopicPartition, MemoryRecords> partitionRecords;
 
     private ProduceRequest(short version, short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) {
         super(version);
         this.acks = acks;
         this.timeout = timeout;
         this.partitionRecords = partitionRecords;
+        this.partitionSizes = createPartitionSizes(partitionRecords);
+    }
+
+    private static Map<TopicPartition, Integer> createPartitionSizes(Map<TopicPartition, MemoryRecords> partitionRecords) {
+        Map<TopicPartition, Integer> result = new HashMap<>(partitionRecords.size());
+        for (Map.Entry<TopicPartition, MemoryRecords> entry : partitionRecords.entrySet())
+            result.put(entry.getKey(), entry.getValue().sizeInBytes());
+        return result;
     }
 
     public ProduceRequest(Struct struct, short version) {
@@ -100,6 +115,7 @@ public class ProduceRequest extends AbstractRequest {
                 partitionRecords.put(new TopicPartition(topic, partition), records);
             }
         }
+        partitionSizes = createPartitionSizes(partitionRecords);
         acks = struct.getShort(ACKS_KEY_NAME);
         timeout = struct.getInt(TIMEOUT_KEY_NAME);
     }
@@ -109,6 +125,8 @@ public class ProduceRequest extends AbstractRequest {
      */
     @Override
     public Struct toStruct() {
+        // Store it in a local variable to protect against concurrent updates
+        Map<TopicPartition, MemoryRecords> partitionRecords = partitionRecordsOrFail();
         Struct struct = new Struct(ApiKeys.PRODUCE.requestSchema(version()));
         Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
         struct.set(ACKS_KEY_NAME, acks);
@@ -133,6 +151,22 @@ public class ProduceRequest extends AbstractRequest {
     }
 
     @Override
+    public String toString(boolean verbose) {
+        // Use the same format as `Struct.toString()`
+        StringBuilder bld = new StringBuilder();
+        bld.append("{acks=").append(acks)
+                .append(",timeout=").append(timeout);
+
+        if (verbose)
+            bld.append(",partitionSizes=").append(Utils.mkString(partitionSizes, "[", "]", "=", ","));
+        else
+            bld.append(",numPartitions=").append(partitionSizes.size());
+
+        bld.append("}");
+        return bld.toString();
+    }
+
+    @Override
     public AbstractResponse getErrorResponse(Throwable e) {
         /* In case the producer doesn't actually want any response */
         if (acks == 0)
@@ -141,8 +175,8 @@ public class ProduceRequest extends AbstractRequest {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
         ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.forException(e));
 
-        for (Map.Entry<TopicPartition, MemoryRecords> entry : partitionRecords.entrySet())
-            responseMap.put(entry.getKey(), partitionResponse);
+        for (TopicPartition tp : partitions())
+            responseMap.put(tp, partitionResponse);
 
         short versionId = version();
         switch (versionId) {
@@ -156,6 +190,10 @@ public class ProduceRequest extends AbstractRequest {
         }
     }
 
+    private Collection<TopicPartition> partitions() {
+        return partitionSizes.keySet();
+    }
+
     public short acks() {
         return acks;
     }
@@ -164,12 +202,20 @@ public class ProduceRequest extends AbstractRequest {
         return timeout;
     }
 
-    public Map<TopicPartition, MemoryRecords> partitionRecords() {
+    /**
+     * Returns the partition records or throws IllegalStateException if clearPartitionRecords() has been invoked.
+     */
+    public Map<TopicPartition, MemoryRecords> partitionRecordsOrFail() {
+        // Store it in a local variable to protect against concurrent updates
+        Map<TopicPartition, MemoryRecords> partitionRecords = this.partitionRecords;
+        if (partitionRecords == null)
+            throw new IllegalStateException("The partition records are no longer available because " +
+                    "clearPartitionRecords() has been invoked.");
         return partitionRecords;
     }
 
     public void clearPartitionRecords() {
-        partitionRecords.clear();
+        partitionRecords = null;
     }
 
     public static ProduceRequest parse(ByteBuffer buffer, short version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1659ca17/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 095b663..64bfdf5 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.ListenerName;
@@ -48,7 +49,9 @@ import java.util.Set;
 
 import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class RequestResponseTest {
 
@@ -212,6 +215,53 @@ public class RequestResponseTest {
     }
 
     @Test
+    public void produceRequestToStringTest() {
+        ProduceRequest request = createProduceRequest();
+        assertEquals(1, request.partitionRecordsOrFail().size());
+        assertFalse(request.toString(false).contains("partitionSizes"));
+        assertTrue(request.toString(false).contains("numPartitions=1"));
+        assertTrue(request.toString(true).contains("partitionSizes"));
+        assertFalse(request.toString(true).contains("numPartitions"));
+
+        request.clearPartitionRecords();
+        try {
+            request.partitionRecordsOrFail();
+            fail("partitionRecordsOrFail should fail after clearPartitionRecords()");
+        } catch (IllegalStateException e) {
+            // OK
+        }
+
+        // `toString` should behave the same after `clearPartitionRecords`
+        assertFalse(request.toString(false).contains("partitionSizes"));
+        assertTrue(request.toString(false).contains("numPartitions=1"));
+        assertTrue(request.toString(true).contains("partitionSizes"));
+        assertFalse(request.toString(true).contains("numPartitions"));
+    }
+
+    @Test
+    public void produceRequestGetErrorResponseTest() {
+        ProduceRequest request = createProduceRequest();
+        Set<TopicPartition> partitions = new HashSet<>(request.partitionRecordsOrFail().keySet());
+
+        ProduceResponse errorResponse = (ProduceResponse) request.getErrorResponse(new NotEnoughReplicasException());
+        assertEquals(partitions, errorResponse.responses().keySet());
+        ProduceResponse.PartitionResponse partitionResponse = errorResponse.responses().values().iterator().next();
+        assertEquals(Errors.NOT_ENOUGH_REPLICAS, partitionResponse.error);
+        assertEquals(ProduceResponse.INVALID_OFFSET, partitionResponse.baseOffset);
+        assertEquals(Record.NO_TIMESTAMP, partitionResponse.logAppendTime);
+
+        request.clearPartitionRecords();
+
+        // `getErrorResponse` should behave the same after `clearPartitionRecords`
+        errorResponse = (ProduceResponse) request.getErrorResponse(new NotEnoughReplicasException());
+        assertEquals(partitions, errorResponse.responses().keySet());
+        partitionResponse = errorResponse.responses().values().iterator().next();
+        assertEquals(Errors.NOT_ENOUGH_REPLICAS, partitionResponse.error);
+        assertEquals(ProduceResponse.INVALID_OFFSET, partitionResponse.baseOffset);
+        assertEquals(Record.NO_TIMESTAMP, partitionResponse.logAppendTime);
+    }
+
+    @Test
     public void produceResponseVersionTest() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1659ca17/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index ad7a3fd..d3b1a0a 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -108,7 +108,7 @@ object RequestChannel extends Logging {
       if (requestObj != null)
         requestObj.describe(details)
       else
-        s"$header -- ${body[AbstractRequest]}"
+        s"$header -- ${body[AbstractRequest].toString(details)}"
     }
 
     def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1659ca17/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 24a224a..fbc0840 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -351,12 +351,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     val produceRequest = request.body[ProduceRequest]
     val numBytesAppended = request.header.toStruct.sizeOf + request.bodyAndSize.size
 
-    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition {
-      case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
-    }
+    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
+      produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
+        authorize(request.session, Describe, new Resource(auth.Topic, tp.topic)) && metadataCache.contains(tp.topic)
+      }
 
     val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
-      case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))
+      case (tp, _) => authorize(request.session, Write, new Resource(auth.Topic, tp.topic))
     }
 
     // the callback for sending a produce response
@@ -426,9 +427,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         authorizedRequestInfo,
         sendResponseCallback)
 
-      // if the request is put into the purgatory, it will have a held reference
-      // and hence cannot be garbage collected; hence we clear its data here in
-      // order to let GC re-claim its memory since it is already appended to log
+      // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
+      // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log
       produceRequest.clearPartitionRecords()
     }
   }