You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/05/21 21:39:40 UTC

[kafka] branch trunk updated: KAFKA-14862 (HOTFIX): Fix ConcurrentModificationException (#13734)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e96a463561c KAFKA-14862 (HOTFIX): Fix ConcurrentModificationException (#13734)
e96a463561c is described below

commit e96a463561ca8974fca37562b8675ae8ae4aff29
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Sun May 21 14:39:12 2023 -0700

    KAFKA-14862 (HOTFIX): Fix ConcurrentModificationException (#13734)
    
    Reviewers: Walker Carlson <<w...@confluent.io>
---
 .../org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index f4ad9ac682f..8e0fcfece0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -44,11 +44,11 @@ import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
@@ -60,7 +60,7 @@ class KStreamImplJoin {
     private final boolean rightOuter;
 
     static class TimeTrackerSupplier {
-        private final Map<TaskId, TimeTracker> tracker = new HashMap<>();
+        private final Map<TaskId, TimeTracker> tracker = new ConcurrentHashMap<>();
 
         public TimeTracker get(final TaskId taskId) {
             return tracker.computeIfAbsent(taskId, taskId1 -> new TimeTracker());