You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2020/11/25 02:50:32 UTC
[kafka] branch 2.6 updated: KAFKA-10758: ProcessorTopology should
only consider its own nodes when updating regex source topics (#9648)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 6da2de7 KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics (#9648)
6da2de7 is described below
commit 6da2de704ffac5961cc0c11c34663330b9ad94c3
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Nov 24 17:59:29 2020 -0800
KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics (#9648)
We should ignore any source nodes that aren't part of the ProcessorTopology's subtopology when updating its source topics after a change in the topic metadata.
Reviewers: Bruno Cadonna <ca...@confluent.io>, Matthias J. Sax <mj...@confluent.io>
---
.../streams/processor/internals/AbstractTask.java | 4 +-
.../processor/internals/ProcessorTopology.java | 32 ++++++++-----
.../streams/processor/internals/StreamTask.java | 4 +-
.../kafka/streams/processor/internals/Task.java | 2 +-
.../streams/processor/internals/TaskManager.java | 2 +-
.../integration/RegexSourceIntegrationTest.java | 56 ++++++++++++++--------
.../processor/internals/ProcessorTopologyTest.java | 16 +++++++
.../processor/internals/TaskManagerTest.java | 2 +-
8 files changed, 79 insertions(+), 39 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 044825a..c37b64c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -104,8 +104,8 @@ public abstract class AbstractTask implements Task {
}
@Override
- public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+ public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
this.inputPartitions = topicPartitions;
- topology.updateSourceTopics(nodeToSourceTopics);
+ topology.updateSourceTopics(allTopologyNodesToSourceTopics);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 7a3bb85..a056143 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -149,24 +149,30 @@ public class ProcessorTopology {
return false;
}
- public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) {
- if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
- log.error("Set of source nodes do not match: \n" +
- "sourceNodesByName = {}\n" +
- "sourceTopicsByName = {}",
- sourceNodesByName.keySet(), sourceTopicsByName.keySet());
- throw new IllegalStateException("Tried to update source topics but source nodes did not match");
- }
+ public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) {
sourceNodesByTopic.clear();
- for (final Map.Entry<String, List<String>> sourceEntry : sourceTopicsByName.entrySet()) {
- final String nodeName = sourceEntry.getKey();
- for (final String topic : sourceEntry.getValue()) {
+ for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourceNodesByName.entrySet()) {
+ final String sourceNodeName = sourceNodeEntry.getKey();
+ final SourceNode<?, ?> sourceNode = sourceNodeEntry.getValue();
+
+ final List<String> updatedSourceTopics = allSourceTopicsByNodeName.get(sourceNodeName);
+ if (updatedSourceTopics == null) {
+ log.error("Unable to find source node {} in updated topics map {}",
+ sourceNodeName, allSourceTopicsByNodeName);
+ throw new IllegalStateException("Node " + sourceNodeName + " not found in full topology");
+ }
+
+ log.trace("Updating source node {} with new topics {}", sourceNodeName, updatedSourceTopics);
+ for (final String topic : updatedSourceTopics) {
if (sourceNodesByTopic.containsKey(topic)) {
+ log.error("Tried to subscribe topic {} to two nodes when updating topics from {}",
+ topic, allSourceTopicsByNodeName);
throw new IllegalStateException("Topic " + topic + " was already registered to source node "
- + sourceNodesByTopic.get(topic).name());
+ + sourceNodesByTopic.get(topic).name());
}
- sourceNodesByTopic.put(topic, sourceNodesByName.get(nodeName));
+ sourceNodesByTopic.put(topic, sourceNode);
}
+
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2f4ee5c..5ed6e52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -509,8 +509,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
@Override
- public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
- super.update(topicPartitions, nodeToSourceTopics);
+ public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
+ super.update(topicPartitions, allTopologyNodesToSourceTopics);
partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index b6ed143..a234d13 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -152,7 +152,7 @@ public interface Task {
/**
* Updates input partitions and topology after rebalance
*/
- void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+ void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics);
/**
* Attempt a clean close but do not close the underlying state
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index a1eb2fa..2001c91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -387,7 +387,7 @@ public class TaskManager {
private void updateInputPartitionsAndResume(final Task task, final Set<TopicPartition> topicPartitions) {
final boolean requiresUpdate = !task.inputPartitions().equals(topicPartitions);
if (requiresUpdate) {
- log.trace("Update task {} inputPartitions: current {}, new {}", task, task.inputPartitions(), topicPartitions);
+ log.debug("Update task {} inputPartitions: current {}, new {}", task, task.inputPartitions(), topicPartitions);
for (final TopicPartition inputPartition : task.inputPartitions()) {
partitionToTask.remove(inputPartition);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 3e205f7..8dae58d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -54,6 +55,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -66,9 +68,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
/**
* End-to-end integration test based on using regex and named topics for creating sources, using
@@ -184,41 +188,55 @@ public class RegexSourceIntegrationTest {
}
@Test
- public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
+ public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopologies() throws Exception {
final String topic1 = "TEST-TOPIC-1";
- CLUSTER.createTopic(topic1);
-
- final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
- pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
- streams = new KafkaStreams(builder.build(), streamsConfiguration);
final String topic2 = "TEST-TOPIC-2";
- streams.start();
- CLUSTER.createTopic(topic2);
+ try {
+ CLUSTER.createTopic(topic1);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+ final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
+
+ pattern1Stream
+ .selectKey((k, v) -> k)
+ .groupByKey()
+ .aggregate(() -> "", (k, v, a) -> v)
+ .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
- final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
- final KeyValue<String, String> record2 = new KeyValue<>("2", "2");
- IntegrationTestUtils.produceKeyValuesSynchronously(
+ final Topology topology = builder.build();
+ assertThat(topology.describe().subtopologies().size(), greaterThan(1));
+ streams = new KafkaStreams(topology, streamsConfiguration);
+
+ startApplicationAndWaitUntilRunning(Collections.singletonList(streams), Duration.ofSeconds(30));
+
+ CLUSTER.createTopic(topic2);
+
+ final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
+ final KeyValue<String, String> record2 = new KeyValue<>("2", "2");
+ IntegrationTestUtils.produceKeyValuesSynchronously(
topic1,
Collections.singletonList(record1),
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class),
CLUSTER.time
- );
- IntegrationTestUtils.produceKeyValuesSynchronously(
+ );
+ IntegrationTestUtils.produceKeyValuesSynchronously(
topic2,
Collections.singletonList(record2),
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class),
CLUSTER.time
- );
- IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ );
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class),
outputTopic,
Arrays.asList(record1, record2)
- );
+ );
- streams.close();
- CLUSTER.deleteTopicsAndWait(topic1, topic2);
+ streams.close();
+ } finally {
+ CLUSTER.deleteTopicsAndWait(topic1, topic2);
+ }
}
private String createTopic(final int suffix) throws InterruptedException {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 9f852cc..98c8e99 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -59,6 +59,8 @@ import java.util.Set;
import java.util.function.Supplier;
import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -173,6 +175,20 @@ public class ProcessorTopologyTest {
}
@Test
+ public void shouldUpdateSourceTopicsOnlyForSourceNodesWithinTheSubtopology() {
+ topology.addSource("source-1", "topic-1");
+ final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+
+ processorTopology.updateSourceTopics(mkMap(
+ mkEntry("source-1", Collections.singletonList("topic-1")),
+ mkEntry("source-2", Collections.singletonList("topic-2")))
+ );
+
+ assertNull(processorTopology.source("topic-2"));
+ assertThat(processorTopology.sources().size(), equalTo(1));
+ }
+
+ @Test
public void testDrivingSimpleTopology() {
final int partition = 10;
driver = new TopologyTestDriver(createSimpleTopology(partition), props);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 2c8b740..444437a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2813,7 +2813,7 @@ public class TaskManagerTest {
}
@Override
- public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+ public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
inputPartitions = topicPartitions;
}