You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/04/05 07:27:54 UTC

[kafka] branch trunk updated: KAFKA-13791: Fix potential race condition in FetchResponse#`fetchData` and `forgottenTopics` (#11981)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 481cc13a13 KAFKA-13791: Fix potential race condition in FetchResponse#`fetchData` and `forgottenTopics` (#11981)
481cc13a13 is described below

commit 481cc13a132d33f23e737f88ae28a1aac135afed
Author: yun-yun <70...@qq.com>
AuthorDate: Tue Apr 5 15:27:32 2022 +0800

    KAFKA-13791: Fix potential race condition in FetchResponse#`fetchData` and `forgottenTopics` (#11981)
    
    Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of lazy-initialized members should be the last step with double-checked locking
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../org/apache/kafka/common/requests/FetchRequest.java   | 16 +++++++++++-----
 .../org/apache/kafka/common/requests/FetchResponse.java  |  2 +-
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 48ba022610..09242bfc4b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -351,8 +351,10 @@ public class FetchRequest extends AbstractRequest {
         if (fetchData == null) {
             synchronized (this) {
                 if (fetchData == null) {
-                    fetchData = new LinkedHashMap<>();
-                    short version = version();
+                    // Assigning the lazy-initialized `fetchData` in the last step
+                    // to avoid other threads accessing a half-initialized object.
+                    final LinkedHashMap<TopicIdPartition, PartitionData> fetchDataTmp = new LinkedHashMap<>();
+                    final short version = version();
                     data.topics().forEach(fetchTopic -> {
                         String name;
                         if (version < 13) {
@@ -362,7 +364,7 @@ public class FetchRequest extends AbstractRequest {
                         }
                         fetchTopic.partitions().forEach(fetchPartition ->
                                 // Topic name may be null here if the topic name was unable to be resolved using the topicNames map.
-                                fetchData.put(new TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name, fetchPartition.partition())),
+                                fetchDataTmp.put(new TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name, fetchPartition.partition())),
                                         new PartitionData(
                                                 fetchTopic.topicId(),
                                                 fetchPartition.fetchOffset(),
@@ -374,6 +376,7 @@ public class FetchRequest extends AbstractRequest {
                                 )
                         );
                     });
+                    fetchData = fetchDataTmp;
                 }
             }
         }
@@ -386,7 +389,9 @@ public class FetchRequest extends AbstractRequest {
         if (toForget == null) {
             synchronized (this) {
                 if (toForget == null) {
-                    toForget = new ArrayList<>();
+                    // Assigning the lazy-initialized `toForget` in the last step
+                    // to avoid other threads accessing a half-initialized object.
+                    final List<TopicIdPartition> toForgetTmp = new ArrayList<>();
                     data.forgottenTopicsData().forEach(forgottenTopic -> {
                         String name;
                         if (version() < 13) {
@@ -395,8 +400,9 @@ public class FetchRequest extends AbstractRequest {
                             name = topicNames.get(forgottenTopic.topicId());
                         }
                         // Topic name may be null here if the topic name was unable to be resolved using the topicNames map.
-                        forgottenTopic.partitions().forEach(partitionId -> toForget.add(new TopicIdPartition(forgottenTopic.topicId(), new TopicPartition(name, partitionId))));
+                        forgottenTopic.partitions().forEach(partitionId -> toForgetTmp.add(new TopicIdPartition(forgottenTopic.topicId(), new TopicPartition(name, partitionId))));
                     });
+                    toForget = toForgetTmp;
                 }
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 0d7049d755..a4af4ca2a2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -100,7 +100,7 @@ public class FetchResponse extends AbstractResponse {
         if (responseData == null) {
             synchronized (this) {
                 if (responseData == null) {
-                    // Assigning the lazy-initialized responseData in the last step
+                    // Assigning the lazy-initialized `responseData` in the last step
                     // to avoid other threads accessing a half-initialized object.
                     final LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseDataTmp =
                             new LinkedHashMap<>();