You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/03/16 16:14:03 UTC
kafka git commit: KAFKA-4901; Make ProduceRequest thread-safe
Repository: kafka
Updated Branches:
refs/heads/trunk be1127281 -> 1659ca177
KAFKA-4901; Make ProduceRequest thread-safe
If request logging is enabled, `ProduceRequest` can be accessed
and mutated concurrently from a network thread (which calls
`toString`) and a request handler thread (which calls
`clearPartitionRecords()`).
That can lead to a `ConcurrentModificationException` when iterating
the `partitionRecords` map.
The underlying thread-safety issue has existed since the server
started using the Java implementation of ProduceRequest in 0.10.0.
However, we were incorrectly not clearing the underlying struct until
0.10.2, so `toString` itself was thread-safe until that change. In 0.10.2,
`toString` is no longer thread-safe and we could potentially see a
`NullPointerException` given the right set of interleavings between
`toString` and `clearPartitionRecords` although we haven't seen that
happen yet.
In trunk, we changed the requests to have a `toStruct` method
instead of creating a struct in the constructor and `toString` was
no longer printing the contents of the `Struct`. This accidentally
fixed the race condition, but it meant that request logging was less
useful.
A couple of days ago, `AbstractRequest.toString` was changed to
print the contents of the request by calling `toStruct().toString()`
and reintroduced the race condition. The impact is more visible
because we iterate over a `HashMap`, which proactively
checks for concurrent modification (unlike arrays).
We will need a separate PR for 0.10.2.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jiangjie Qin <be...@gmail.com>, Onur Karaman <ok...@linkedin.com>, Jun Rao <ju...@gmail.com>
Closes #2689 from ijuma/produce-request-thread-safety
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1659ca17
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1659ca17
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1659ca17
Branch: refs/heads/trunk
Commit: 1659ca1773596b5889fe8b2f163196f7b32ea70a
Parents: be11272
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Mar 16 09:13:58 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Mar 16 09:13:58 2017 -0700
----------------------------------------------------------------------
.../kafka/common/requests/AbstractRequest.java | 8 ++-
.../kafka/common/requests/ProduceRequest.java | 56 ++++++++++++++++++--
.../common/requests/RequestResponseTest.java | 50 +++++++++++++++++
.../scala/kafka/network/RequestChannel.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 14 ++---
5 files changed, 115 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1659ca17/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 3d79f7e..7cfc54f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -79,11 +79,15 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
protected abstract Struct toStruct();
- @Override
- public String toString() {
+ public String toString(boolean verbose) {
return toStruct().toString();
}
+ @Override
+ public final String toString() {
+ return toString(true);
+ }
+
/**
* Get an error response for a request
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/1659ca17/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 5e4b5fb..df3b4fc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -78,13 +79,27 @@ public class ProduceRequest extends AbstractRequest {
private final short acks;
private final int timeout;
- private final Map<TopicPartition, MemoryRecords> partitionRecords;
+
+ private final Map<TopicPartition, Integer> partitionSizes;
+
+ // This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention when a produce request is
+ // put in the purgatory (due to client throttling, it can take a while before the response is sent).
+ // Care should be taken in methods that use this field.
+ private volatile Map<TopicPartition, MemoryRecords> partitionRecords;
private ProduceRequest(short version, short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) {
super(version);
this.acks = acks;
this.timeout = timeout;
this.partitionRecords = partitionRecords;
+ this.partitionSizes = createPartitionSizes(partitionRecords);
+ }
+
+ private static Map<TopicPartition, Integer> createPartitionSizes(Map<TopicPartition, MemoryRecords> partitionRecords) {
+ Map<TopicPartition, Integer> result = new HashMap<>(partitionRecords.size());
+ for (Map.Entry<TopicPartition, MemoryRecords> entry : partitionRecords.entrySet())
+ result.put(entry.getKey(), entry.getValue().sizeInBytes());
+ return result;
}
public ProduceRequest(Struct struct, short version) {
@@ -100,6 +115,7 @@ public class ProduceRequest extends AbstractRequest {
partitionRecords.put(new TopicPartition(topic, partition), records);
}
}
+ partitionSizes = createPartitionSizes(partitionRecords);
acks = struct.getShort(ACKS_KEY_NAME);
timeout = struct.getInt(TIMEOUT_KEY_NAME);
}
@@ -109,6 +125,8 @@ public class ProduceRequest extends AbstractRequest {
*/
@Override
public Struct toStruct() {
+ // Store it in a local variable to protect against concurrent updates
+ Map<TopicPartition, MemoryRecords> partitionRecords = partitionRecordsOrFail();
Struct struct = new Struct(ApiKeys.PRODUCE.requestSchema(version()));
Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
struct.set(ACKS_KEY_NAME, acks);
@@ -133,6 +151,22 @@ public class ProduceRequest extends AbstractRequest {
}
@Override
+ public String toString(boolean verbose) {
+ // Use the same format as `Struct.toString()`
+ StringBuilder bld = new StringBuilder();
+ bld.append("{acks=").append(acks)
+ .append(",timeout=").append(timeout);
+
+ if (verbose)
+ bld.append(",partitionSizes=").append(Utils.mkString(partitionSizes, "[", "]", "=", ","));
+ else
+ bld.append(",numPartitions=").append(partitionSizes.size());
+
+ bld.append("}");
+ return bld.toString();
+ }
+
+ @Override
public AbstractResponse getErrorResponse(Throwable e) {
/* In case the producer doesn't actually want any response */
if (acks == 0)
@@ -141,8 +175,8 @@ public class ProduceRequest extends AbstractRequest {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.forException(e));
- for (Map.Entry<TopicPartition, MemoryRecords> entry : partitionRecords.entrySet())
- responseMap.put(entry.getKey(), partitionResponse);
+ for (TopicPartition tp : partitions())
+ responseMap.put(tp, partitionResponse);
short versionId = version();
switch (versionId) {
@@ -156,6 +190,10 @@ public class ProduceRequest extends AbstractRequest {
}
}
+ private Collection<TopicPartition> partitions() {
+ return partitionSizes.keySet();
+ }
+
public short acks() {
return acks;
}
@@ -164,12 +202,20 @@ public class ProduceRequest extends AbstractRequest {
return timeout;
}
- public Map<TopicPartition, MemoryRecords> partitionRecords() {
+ /**
+ * Returns the partition records or throws IllegalStateException if clearPartitionRecords() has been invoked.
+ */
+ public Map<TopicPartition, MemoryRecords> partitionRecordsOrFail() {
+ // Store it in a local variable to protect against concurrent updates
+ Map<TopicPartition, MemoryRecords> partitionRecords = this.partitionRecords;
+ if (partitionRecords == null)
+ throw new IllegalStateException("The partition records are no longer available because " +
+ "clearPartitionRecords() has been invoked.");
return partitionRecords;
}
public void clearPartitionRecords() {
- partitionRecords.clear();
+ partitionRecords = null;
}
public static ProduceRequest parse(ByteBuffer buffer, short version) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1659ca17/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 095b663..64bfdf5 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.ListenerName;
@@ -48,7 +49,9 @@ import java.util.Set;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class RequestResponseTest {
@@ -212,6 +215,53 @@ public class RequestResponseTest {
}
@Test
+ public void produceRequestToStringTest() {
+ ProduceRequest request = createProduceRequest();
+ assertEquals(1, request.partitionRecordsOrFail().size());
+ assertFalse(request.toString(false).contains("partitionSizes"));
+ assertTrue(request.toString(false).contains("numPartitions=1"));
+ assertTrue(request.toString(true).contains("partitionSizes"));
+ assertFalse(request.toString(true).contains("numPartitions"));
+
+ request.clearPartitionRecords();
+ try {
+ request.partitionRecordsOrFail();
+ fail("partitionRecordsOrFail should fail after clearPartitionRecords()");
+ } catch (IllegalStateException e) {
+ // OK
+ }
+
+ // `toString` should behave the same after `clearPartitionRecords`
+ assertFalse(request.toString(false).contains("partitionSizes"));
+ assertTrue(request.toString(false).contains("numPartitions=1"));
+ assertTrue(request.toString(true).contains("partitionSizes"));
+ assertFalse(request.toString(true).contains("numPartitions"));
+ }
+
+ @Test
+ public void produceRequestGetErrorResponseTest() {
+ ProduceRequest request = createProduceRequest();
+ Set<TopicPartition> partitions = new HashSet<>(request.partitionRecordsOrFail().keySet());
+
+ ProduceResponse errorResponse = (ProduceResponse) request.getErrorResponse(new NotEnoughReplicasException());
+ assertEquals(partitions, errorResponse.responses().keySet());
+ ProduceResponse.PartitionResponse partitionResponse = errorResponse.responses().values().iterator().next();
+ assertEquals(Errors.NOT_ENOUGH_REPLICAS, partitionResponse.error);
+ assertEquals(ProduceResponse.INVALID_OFFSET, partitionResponse.baseOffset);
+ assertEquals(Record.NO_TIMESTAMP, partitionResponse.logAppendTime);
+
+ request.clearPartitionRecords();
+
+ // `getErrorResponse` should behave the same after `clearPartitionRecords`
+ errorResponse = (ProduceResponse) request.getErrorResponse(new NotEnoughReplicasException());
+ assertEquals(partitions, errorResponse.responses().keySet());
+ partitionResponse = errorResponse.responses().values().iterator().next();
+ assertEquals(Errors.NOT_ENOUGH_REPLICAS, partitionResponse.error);
+ assertEquals(ProduceResponse.INVALID_OFFSET, partitionResponse.baseOffset);
+ assertEquals(Record.NO_TIMESTAMP, partitionResponse.logAppendTime);
+ }
+
+ @Test
public void produceResponseVersionTest() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
http://git-wip-us.apache.org/repos/asf/kafka/blob/1659ca17/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index ad7a3fd..d3b1a0a 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -108,7 +108,7 @@ object RequestChannel extends Logging {
if (requestObj != null)
requestObj.describe(details)
else
- s"$header -- ${body[AbstractRequest]}"
+ s"$header -- ${body[AbstractRequest].toString(details)}"
}
def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1659ca17/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 24a224a..fbc0840 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -351,12 +351,13 @@ class KafkaApis(val requestChannel: RequestChannel,
val produceRequest = request.body[ProduceRequest]
val numBytesAppended = request.header.toStruct.sizeOf + request.bodyAndSize.size
- val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition {
- case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
- }
+ val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
+ produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
+ authorize(request.session, Describe, new Resource(auth.Topic, tp.topic)) && metadataCache.contains(tp.topic)
+ }
val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
- case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))
+ case (tp, _) => authorize(request.session, Write, new Resource(auth.Topic, tp.topic))
}
// the callback for sending a produce response
@@ -426,9 +427,8 @@ class KafkaApis(val requestChannel: RequestChannel,
authorizedRequestInfo,
sendResponseCallback)
- // if the request is put into the purgatory, it will have a held reference
- // and hence cannot be garbage collected; hence we clear its data here in
- // order to let GC re-claim its memory since it is already appended to log
+ // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
+ // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log
produceRequest.clearPartitionRecords()
}
}