You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/04/05 07:27:54 UTC
[kafka] branch trunk updated: KAFKA-13791: Fix potential race condition in FetchResponse#`fetchData` and `forgottenTopics` (#11981)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 481cc13a13 KAFKA-13791: Fix potential race condition in FetchResponse#`fetchData` and `forgottenTopics` (#11981)
481cc13a13 is described below
commit 481cc13a132d33f23e737f88ae28a1aac135afed
Author: yun-yun <70...@qq.com>
AuthorDate: Tue Apr 5 15:27:32 2022 +0800
KAFKA-13791: Fix potential race condition in FetchResponse#`fetchData` and `forgottenTopics` (#11981)
Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of lazy-initialized members should be the last step with double-checked locking
Reviewers: Luke Chen <sh...@gmail.com>
---
.../org/apache/kafka/common/requests/FetchRequest.java | 16 +++++++++++-----
.../org/apache/kafka/common/requests/FetchResponse.java | 2 +-
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 48ba022610..09242bfc4b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -351,8 +351,10 @@ public class FetchRequest extends AbstractRequest {
if (fetchData == null) {
synchronized (this) {
if (fetchData == null) {
- fetchData = new LinkedHashMap<>();
- short version = version();
+ // Assigning the lazy-initialized `fetchData` in the last step
+ // to avoid other threads accessing a half-initialized object.
+ final LinkedHashMap<TopicIdPartition, PartitionData> fetchDataTmp = new LinkedHashMap<>();
+ final short version = version();
data.topics().forEach(fetchTopic -> {
String name;
if (version < 13) {
@@ -362,7 +364,7 @@ public class FetchRequest extends AbstractRequest {
}
fetchTopic.partitions().forEach(fetchPartition ->
// Topic name may be null here if the topic name was unable to be resolved using the topicNames map.
- fetchData.put(new TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name, fetchPartition.partition())),
+ fetchDataTmp.put(new TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name, fetchPartition.partition())),
new PartitionData(
fetchTopic.topicId(),
fetchPartition.fetchOffset(),
@@ -374,6 +376,7 @@ public class FetchRequest extends AbstractRequest {
)
);
});
+ fetchData = fetchDataTmp;
}
}
}
@@ -386,7 +389,9 @@ public class FetchRequest extends AbstractRequest {
if (toForget == null) {
synchronized (this) {
if (toForget == null) {
- toForget = new ArrayList<>();
+ // Assigning the lazy-initialized `toForget` in the last step
+ // to avoid other threads accessing a half-initialized object.
+ final List<TopicIdPartition> toForgetTmp = new ArrayList<>();
data.forgottenTopicsData().forEach(forgottenTopic -> {
String name;
if (version() < 13) {
@@ -395,8 +400,9 @@ public class FetchRequest extends AbstractRequest {
name = topicNames.get(forgottenTopic.topicId());
}
// Topic name may be null here if the topic name was unable to be resolved using the topicNames map.
- forgottenTopic.partitions().forEach(partitionId -> toForget.add(new TopicIdPartition(forgottenTopic.topicId(), new TopicPartition(name, partitionId))));
+ forgottenTopic.partitions().forEach(partitionId -> toForgetTmp.add(new TopicIdPartition(forgottenTopic.topicId(), new TopicPartition(name, partitionId))));
});
+ toForget = toForgetTmp;
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 0d7049d755..a4af4ca2a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -100,7 +100,7 @@ public class FetchResponse extends AbstractResponse {
if (responseData == null) {
synchronized (this) {
if (responseData == null) {
- // Assigning the lazy-initialized responseData in the last step
+ // Assigning the lazy-initialized `responseData` in the last step
// to avoid other threads accessing a half-initialized object.
final LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseDataTmp =
new LinkedHashMap<>();