You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2020/11/25 02:21:56 UTC
[kafka] branch 2.7 updated: KAFKA-10758: ProcessorTopology should
only consider its own nodes when updating regex source topics (#9648)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 8a39712 KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics (#9648)
8a39712 is described below
commit 8a397124d3d87990243590d7cef983ff97aa2ed4
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Nov 24 17:59:29 2020 -0800
KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics (#9648)
We should ignore any source nodes that aren't part of the ProcessorTopology's subtopology when updating its source topics after a change in the topic metadata.
Reviewers: Bruno Cadonna <ca...@confluent.io>, Matthias J. Sax <mj...@confluent.io>
---
.../streams/processor/internals/AbstractTask.java | 4 +--
.../processor/internals/ProcessorTopology.java | 32 +++++++++++++---------
.../streams/processor/internals/StreamTask.java | 4 +--
.../kafka/streams/processor/internals/Task.java | 2 +-
.../streams/processor/internals/TaskManager.java | 2 +-
.../integration/RegexSourceIntegrationTest.java | 22 ++++++++++++---
.../processor/internals/ProcessorTopologyTest.java | 16 +++++++++++
.../processor/internals/TaskManagerTest.java | 2 +-
8 files changed, 60 insertions(+), 24 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index ecf5ec1..cb3b894 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -142,9 +142,9 @@ public abstract class AbstractTask implements Task {
}
@Override
- public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+ public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
this.inputPartitions = topicPartitions;
- topology.updateSourceTopics(nodeToSourceTopics);
+ topology.updateSourceTopics(allTopologyNodesToSourceTopics);
}
void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 4ee3c0e..c4821c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -149,24 +149,30 @@ public class ProcessorTopology {
return false;
}
- public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) {
- if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
- log.error("Set of source nodes do not match: \n" +
- "sourceNodesByName = {}\n" +
- "sourceTopicsByName = {}",
- sourceNodesByName.keySet(), sourceTopicsByName.keySet());
- throw new IllegalStateException("Tried to update source topics but source nodes did not match");
- }
+ public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) {
sourceNodesByTopic.clear();
- for (final Map.Entry<String, List<String>> sourceEntry : sourceTopicsByName.entrySet()) {
- final String nodeName = sourceEntry.getKey();
- for (final String topic : sourceEntry.getValue()) {
+ for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : sourceNodesByName.entrySet()) {
+ final String sourceNodeName = sourceNodeEntry.getKey();
+ final SourceNode<?, ?, ?, ?> sourceNode = sourceNodeEntry.getValue();
+
+ final List<String> updatedSourceTopics = allSourceTopicsByNodeName.get(sourceNodeName);
+ if (updatedSourceTopics == null) {
+ log.error("Unable to find source node {} in updated topics map {}",
+ sourceNodeName, allSourceTopicsByNodeName);
+ throw new IllegalStateException("Node " + sourceNodeName + " not found in full topology");
+ }
+
+ log.trace("Updating source node {} with new topics {}", sourceNodeName, updatedSourceTopics);
+ for (final String topic : updatedSourceTopics) {
if (sourceNodesByTopic.containsKey(topic)) {
+ log.error("Tried to subscribe topic {} to two nodes when updating topics from {}",
+ topic, allSourceTopicsByNodeName);
throw new IllegalStateException("Topic " + topic + " was already registered to source node "
- + sourceNodesByTopic.get(topic).name());
+ + sourceNodesByTopic.get(topic).name());
}
- sourceNodesByTopic.put(topic, sourceNodesByName.get(nodeName));
+ sourceNodesByTopic.put(topic, sourceNode);
}
+
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 107463b..74ec01d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -501,8 +501,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
@Override
- public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
- super.update(topicPartitions, nodeToSourceTopics);
+ public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
+ super.update(topicPartitions, allTopologyNodesToSourceTopics);
partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 20a0685..6b00e41 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -153,7 +153,7 @@ public interface Task {
/**
* Updates input partitions and topology after rebalance
*/
- void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+ void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics);
/**
* Attempt a clean close but do not close the underlying state
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 5cd7447..d0c6a1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -434,7 +434,7 @@ public class TaskManager {
private void updateInputPartitionsAndResume(final Task task, final Set<TopicPartition> topicPartitions) {
final boolean requiresUpdate = !task.inputPartitions().equals(topicPartitions);
if (requiresUpdate) {
- log.trace("Update task {} inputPartitions: current {}, new {}", task, task.inputPartitions(), topicPartitions);
+ log.debug("Update task {} inputPartitions: current {}, new {}", task, task.inputPartitions(), topicPartitions);
for (final TopicPartition inputPartition : task.inputPartitions()) {
partitionToTask.remove(inputPartition);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index c80708e..3906b6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -54,6 +55,7 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -66,9 +68,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
/**
* End-to-end integration test based on using regex and named topics for creating sources, using
@@ -188,7 +192,7 @@ public class RegexSourceIntegrationTest {
}
@Test
- public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
+ public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopologies() throws Exception {
final String topic1 = "TEST-TOPIC-1";
final String topic2 = "TEST-TOPIC-2";
@@ -197,9 +201,19 @@ public class RegexSourceIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
- pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
- streams = new KafkaStreams(builder.build(), streamsConfiguration);
- streams.start();
+ final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
+
+ pattern1Stream
+ .selectKey((k, v) -> k)
+ .groupByKey()
+ .aggregate(() -> "", (k, v, a) -> v)
+ .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+ final Topology topology = builder.build();
+ assertThat(topology.describe().subtopologies().size(), greaterThan(1));
+ streams = new KafkaStreams(topology, streamsConfiguration);
+
+ startApplicationAndWaitUntilRunning(Collections.singletonList(streams), Duration.ofSeconds(30));
CLUSTER.createTopic(topic2);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index b69ae87..1523c90 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -59,6 +59,8 @@ import java.util.Set;
import java.util.function.Supplier;
import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -173,6 +175,20 @@ public class ProcessorTopologyTest {
}
@Test
+ public void shouldUpdateSourceTopicsOnlyForSourceNodesWithinTheSubtopology() {
+ topology.addSource("source-1", "topic-1");
+ final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+
+ processorTopology.updateSourceTopics(mkMap(
+ mkEntry("source-1", Collections.singletonList("topic-1")),
+ mkEntry("source-2", Collections.singletonList("topic-2")))
+ );
+
+ assertNull(processorTopology.source("topic-2"));
+ assertThat(processorTopology.sources().size(), equalTo(1));
+ }
+
+ @Test
public void testDrivingSimpleTopology() {
final int partition = 10;
driver = new TopologyTestDriver(createSimpleTopology(partition), props);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 49a87b6..01f365d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2845,7 +2845,7 @@ public class TaskManagerTest {
}
@Override
- public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+ public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
inputPartitions = topicPartitions;
}