You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2020/11/25 02:50:32 UTC

[kafka] branch 2.6 updated: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics (#9648)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 6da2de7  KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics (#9648)
6da2de7 is described below

commit 6da2de704ffac5961cc0c11c34663330b9ad94c3
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Nov 24 17:59:29 2020 -0800

    KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics (#9648)
    
     We should ignore any source nodes that aren't part of the ProcessorTopology's subtopology when updating its source topics after a change in the topic metadata.
    
    Reviewers: Bruno Cadonna <ca...@confluent.io>, Matthias J. Sax <mj...@confluent.io>
---
 .../streams/processor/internals/AbstractTask.java  |  4 +-
 .../processor/internals/ProcessorTopology.java     | 32 ++++++++-----
 .../streams/processor/internals/StreamTask.java    |  4 +-
 .../kafka/streams/processor/internals/Task.java    |  2 +-
 .../streams/processor/internals/TaskManager.java   |  2 +-
 .../integration/RegexSourceIntegrationTest.java    | 56 ++++++++++++++--------
 .../processor/internals/ProcessorTopologyTest.java | 16 +++++++
 .../processor/internals/TaskManagerTest.java       |  2 +-
 8 files changed, 79 insertions(+), 39 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 044825a..c37b64c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -104,8 +104,8 @@ public abstract class AbstractTask implements Task {
     }
 
     @Override
-    public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+    public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
         this.inputPartitions = topicPartitions;
-        topology.updateSourceTopics(nodeToSourceTopics);
+        topology.updateSourceTopics(allTopologyNodesToSourceTopics);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 7a3bb85..a056143 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -149,24 +149,30 @@ public class ProcessorTopology {
         return false;
     }
 
-    public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) {
-        if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
-            log.error("Set of source nodes do not match: \n" +
-                "sourceNodesByName = {}\n" +
-                "sourceTopicsByName = {}",
-                sourceNodesByName.keySet(), sourceTopicsByName.keySet());
-            throw new IllegalStateException("Tried to update source topics but source nodes did not match");
-        }
+    public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) {
         sourceNodesByTopic.clear();
-        for (final Map.Entry<String, List<String>> sourceEntry : sourceTopicsByName.entrySet()) {
-            final String nodeName = sourceEntry.getKey();
-            for (final String topic : sourceEntry.getValue()) {
+        for (final Map.Entry<String, SourceNode<?, ?>> sourceNodeEntry : sourceNodesByName.entrySet()) {
+            final String sourceNodeName = sourceNodeEntry.getKey();
+            final SourceNode<?, ?> sourceNode = sourceNodeEntry.getValue();
+
+            final List<String> updatedSourceTopics = allSourceTopicsByNodeName.get(sourceNodeName);
+            if (updatedSourceTopics == null) {
+                log.error("Unable to find source node {} in updated topics map {}",
+                          sourceNodeName, allSourceTopicsByNodeName);
+                throw new IllegalStateException("Node " + sourceNodeName + " not found in full topology");
+            }
+
+            log.trace("Updating source node {} with new topics {}", sourceNodeName, updatedSourceTopics);
+            for (final String topic : updatedSourceTopics) {
                 if (sourceNodesByTopic.containsKey(topic)) {
+                    log.error("Tried to subscribe topic {} to two nodes when updating topics from {}",
+                              topic, allSourceTopicsByNodeName);
                     throw new IllegalStateException("Topic " + topic + " was already registered to source node "
-                        + sourceNodesByTopic.get(topic).name());
+                                                        + sourceNodesByTopic.get(topic).name());
                 }
-                sourceNodesByTopic.put(topic, sourceNodesByName.get(nodeName));
+                sourceNodesByTopic.put(topic, sourceNode);
             }
+
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2f4ee5c..5ed6e52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -509,8 +509,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     }
 
     @Override
-    public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
-        super.update(topicPartitions, nodeToSourceTopics);
+    public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
+        super.update(topicPartitions, allTopologyNodesToSourceTopics);
         partitionGroup.updatePartitions(topicPartitions, recordQueueCreator::createQueue);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index b6ed143..a234d13 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -152,7 +152,7 @@ public interface Task {
     /**
      * Updates input partitions and topology after rebalance
      */
-    void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics);
+    void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics);
 
     /**
      * Attempt a clean close but do not close the underlying state
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index a1eb2fa..2001c91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -387,7 +387,7 @@ public class TaskManager {
     private void updateInputPartitionsAndResume(final Task task, final Set<TopicPartition> topicPartitions) {
         final boolean requiresUpdate = !task.inputPartitions().equals(topicPartitions);
         if (requiresUpdate) {
-            log.trace("Update task {} inputPartitions: current {}, new {}", task, task.inputPartitions(), topicPartitions);
+            log.debug("Update task {} inputPartitions: current {}, new {}", task, task.inputPartitions(), topicPartitions);
             for (final TopicPartition inputPartition : task.inputPartitions()) {
                 partitionToTask.remove(inputPartition);
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 3e205f7..8dae58d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -54,6 +55,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -66,9 +68,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 
 /**
  * End-to-end integration test based on using regex and named topics for creating sources, using
@@ -184,41 +188,55 @@ public class RegexSourceIntegrationTest {
     }
 
     @Test
-    public void testRegexRecordsAreProcessedAfterReassignment() throws Exception {
+    public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopologies() throws Exception {
         final String topic1 = "TEST-TOPIC-1";
-        CLUSTER.createTopic(topic1);
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-        pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
-        streams = new KafkaStreams(builder.build(), streamsConfiguration);
         final String topic2 = "TEST-TOPIC-2";
-        streams.start();
 
-        CLUSTER.createTopic(topic2);
+        try {
+            CLUSTER.createTopic(topic1);
+
+            final StreamsBuilder builder = new StreamsBuilder();
+            final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+            final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
+
+            pattern1Stream
+                .selectKey((k, v) -> k)
+                .groupByKey()
+                .aggregate(() -> "", (k, v, a) -> v)
+                .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
-        final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
-        final KeyValue<String, String> record2 = new KeyValue<>("2", "2");
-        IntegrationTestUtils.produceKeyValuesSynchronously(
+            final Topology topology = builder.build();
+            assertThat(topology.describe().subtopologies().size(), greaterThan(1));
+            streams = new KafkaStreams(topology, streamsConfiguration);
+
+            startApplicationAndWaitUntilRunning(Collections.singletonList(streams), Duration.ofSeconds(30));
+
+            CLUSTER.createTopic(topic2);
+
+            final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
+            final KeyValue<String, String> record2 = new KeyValue<>("2", "2");
+            IntegrationTestUtils.produceKeyValuesSynchronously(
                 topic1,
                 Collections.singletonList(record1),
                 TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class),
                 CLUSTER.time
-        );
-        IntegrationTestUtils.produceKeyValuesSynchronously(
+            );
+            IntegrationTestUtils.produceKeyValuesSynchronously(
                 topic2,
                 Collections.singletonList(record2),
                 TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class),
                 CLUSTER.time
-        );
-        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+            );
+            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
                 TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class),
                 outputTopic,
                 Arrays.asList(record1, record2)
-        );
+            );
 
-        streams.close();
-        CLUSTER.deleteTopicsAndWait(topic1, topic2);
+            streams.close();
+        } finally {
+            CLUSTER.deleteTopicsAndWait(topic1, topic2);
+        }
     }
 
     private String createTopic(final int suffix) throws InterruptedException {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 9f852cc..98c8e99 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -59,6 +59,8 @@ import java.util.Set;
 import java.util.function.Supplier;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -173,6 +175,20 @@ public class ProcessorTopologyTest {
     }
 
     @Test
+    public void shouldUpdateSourceTopicsOnlyForSourceNodesWithinTheSubtopology() {
+        topology.addSource("source-1", "topic-1");
+        final ProcessorTopology processorTopology = topology.getInternalBuilder("X").buildTopology();
+
+        processorTopology.updateSourceTopics(mkMap(
+            mkEntry("source-1", Collections.singletonList("topic-1")),
+            mkEntry("source-2", Collections.singletonList("topic-2")))
+        );
+
+        assertNull(processorTopology.source("topic-2"));
+        assertThat(processorTopology.sources().size(), equalTo(1));
+    }
+
+    @Test
     public void testDrivingSimpleTopology() {
         final int partition = 10;
         driver = new TopologyTestDriver(createSimpleTopology(partition), props);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 2c8b740..444437a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2813,7 +2813,7 @@ public class TaskManagerTest {
         }
 
         @Override
-        public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
+        public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics) {
             inputPartitions = topicPartitions;
         }