You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/01 16:44:36 UTC

[kafka] branch 2.1 updated: KAFKA-8298: Fix possible concurrent modification exception (#6643)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new d78e392  KAFKA-8298: Fix possible concurrent modification exception (#6643)
d78e392 is described below

commit d78e3922f7317ca8302e8f54c676f2741a99c0db
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed May 1 12:07:45 2019 -0400

    KAFKA-8298: Fix possible concurrent modification exception (#6643)
    
    When processing multiple key-changing operations during the optimization phase a ConcurrentModificationException is possible.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../kstream/internals/InternalStreamsBuilder.java  |  8 ++++--
 .../kstream/internals/graph/StreamsGraphTest.java  | 33 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 20df084..c5e9d04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -43,9 +43,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Properties;
@@ -305,8 +307,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     @SuppressWarnings("unchecked")
     private void maybeOptimizeRepartitionOperations() {
         maybeUpdateKeyChangingRepartitionNodeMap();
+        final Iterator<Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>> entryIterator =  keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
 
-        for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) {
+        while (entryIterator.hasNext()) {
+            final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry = entryIterator.next();
 
             final StreamsGraphNode keyChangingNode = entry.getKey();
 
@@ -362,7 +366,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
             }
 
             keyChangingNode.addChild(optimizedSingleRepartition);
-            keyChangingOperationsToOptimizableRepartitionNodes.remove(entry.getKey());
+            entryIterator.remove();
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index bd43685..0fecaa2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -17,15 +17,21 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -68,6 +74,33 @@ public class StreamsGraphTest {
     }
 
     @Test
+    public void shouldBeAbleToProcessNestedMultipleKeyChangingNodes() {
+        final Properties properties = new Properties();
+        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
+        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> inputStream = builder.stream("inputTopic");
+
+        final KStream<String, String> changedKeyStream = inputStream.selectKey((k, v) -> v.substring(0, 5));
+
+        // first repartition
+        changedKeyStream.groupByKey(Grouped.as("count-repartition"))
+            .count(Materialized.as("count-store"))
+            .toStream().to("count-topic", Produced.with(Serdes.String(), Serdes.Long()));
+
+        // second repartition
+        changedKeyStream.groupByKey(Grouped.as("windowed-repartition"))
+            .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
+            .count(Materialized.as("windowed-count-store"))
+            .toStream()
+            .map((k, v) -> KeyValue.pair(k.key(), v)).to("windowed-count", Produced.with(Serdes.String(), Serdes.Long()));
+
+        builder.build(properties);
+    }
+
+    @Test
     public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
 
         final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);