You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/01 16:07:57 UTC

[kafka] branch trunk updated: KAFKA-8298: Fix possible concurrent modification exception (#6643)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba1fc21  KAFKA-8298: Fix possible concurrent modification exception (#6643)
ba1fc21 is described below

commit ba1fc21864bbd4c1374695890f98ff1fa2614504
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed May 1 12:07:45 2019 -0400

    KAFKA-8298: Fix possible concurrent modification exception (#6643)
    
    When processing multiple key-changing operations during the optimization phase a ConcurrentModificationException is possible.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kstream/internals/InternalStreamsBuilder.java  |  8 ++++--
 .../kstream/internals/graph/StreamsGraphTest.java  | 33 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 960b030..e7a7678 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -42,9 +42,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Properties;
@@ -311,8 +313,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     @SuppressWarnings("unchecked")
     private void maybeOptimizeRepartitionOperations() {
         maybeUpdateKeyChangingRepartitionNodeMap();
+        final Iterator<Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>> entryIterator =  keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
 
-        for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) {
+        while (entryIterator.hasNext()) {
+            final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry = entryIterator.next();
 
             final StreamsGraphNode keyChangingNode = entry.getKey();
 
@@ -368,7 +372,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
             }
 
             keyChangingNode.addChild(optimizedSingleRepartition);
-            keyChangingOperationsToOptimizableRepartitionNodes.remove(entry.getKey());
+            entryIterator.remove();
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index bd43685..0fecaa2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -17,15 +17,21 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -68,6 +74,33 @@ public class StreamsGraphTest {
     }
 
     @Test
+    public void shouldBeAbleToProcessNestedMultipleKeyChangingNodes() {
+        final Properties properties = new Properties();
+        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
+        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> inputStream = builder.stream("inputTopic");
+
+        final KStream<String, String> changedKeyStream = inputStream.selectKey((k, v) -> v.substring(0, 5));
+
+        // first repartition
+        changedKeyStream.groupByKey(Grouped.as("count-repartition"))
+            .count(Materialized.as("count-store"))
+            .toStream().to("count-topic", Produced.with(Serdes.String(), Serdes.Long()));
+
+        // second repartition
+        changedKeyStream.groupByKey(Grouped.as("windowed-repartition"))
+            .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
+            .count(Materialized.as("windowed-count-store"))
+            .toStream()
+            .map((k, v) -> KeyValue.pair(k.key(), v)).to("windowed-count", Produced.with(Serdes.String(), Serdes.Long()));
+
+        builder.build(properties);
+    }
+
+    @Test
     public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
 
         final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);