You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/01 16:07:57 UTC
[kafka] branch trunk updated: KAFKA-8298: Fix possible concurrent
modification exception (#6643)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ba1fc21 KAFKA-8298: Fix possible concurrent modification exception (#6643)
ba1fc21 is described below
commit ba1fc21864bbd4c1374695890f98ff1fa2614504
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed May 1 12:07:45 2019 -0400
KAFKA-8298: Fix possible concurrent modification exception (#6643)
When processing multiple key-changing operations during the optimization phase a ConcurrentModificationException is possible.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../kstream/internals/InternalStreamsBuilder.java | 8 ++++--
.../kstream/internals/graph/StreamsGraphTest.java | 33 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 960b030..e7a7678 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -42,9 +42,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Properties;
@@ -311,8 +313,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
@SuppressWarnings("unchecked")
private void maybeOptimizeRepartitionOperations() {
maybeUpdateKeyChangingRepartitionNodeMap();
+ final Iterator<Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>> entryIterator = keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
- for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) {
+ while (entryIterator.hasNext()) {
+ final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry = entryIterator.next();
final StreamsGraphNode keyChangingNode = entry.getKey();
@@ -368,7 +372,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
}
keyChangingNode.addChild(optimizedSingleRepartition);
- keyChangingOperationsToOptimizableRepartitionNodes.remove(entry.getKey());
+ entryIterator.remove();
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index bd43685..0fecaa2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -17,15 +17,21 @@
package org.apache.kafka.streams.kstream.internals.graph;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.junit.Test;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -68,6 +74,33 @@ public class StreamsGraphTest {
}
@Test
+ public void shouldBeAbleToProcessNestedMultipleKeyChangingNodes() {
+ final Properties properties = new Properties();
+ properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
+ properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> inputStream = builder.stream("inputTopic");
+
+ final KStream<String, String> changedKeyStream = inputStream.selectKey((k, v) -> v.substring(0, 5));
+
+ // first repartition
+ changedKeyStream.groupByKey(Grouped.as("count-repartition"))
+ .count(Materialized.as("count-store"))
+ .toStream().to("count-topic", Produced.with(Serdes.String(), Serdes.Long()));
+
+ // second repartition
+ changedKeyStream.groupByKey(Grouped.as("windowed-repartition"))
+ .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
+ .count(Materialized.as("windowed-count-store"))
+ .toStream()
+ .map((k, v) -> KeyValue.pair(k.key(), v)).to("windowed-count", Produced.with(Serdes.String(), Serdes.Long()));
+
+ builder.build(properties);
+ }
+
+ @Test
public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);