You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/09/30 19:06:52 UTC

[kafka] branch trunk updated: KAFKA-10205: Documentation and handling of non deterministic Topologies (#9064)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d1c82a9  KAFKA-10205: Documentation and handling of non deterministic Topologies (#9064)
d1c82a9 is described below

commit d1c82a9baf5f3a0d3bde3ec35157358f7a62ecc8
Author: Igor Soarez <so...@apple.com>
AuthorDate: Wed Sep 30 20:06:11 2020 +0100

    KAFKA-10205: Documentation and handling of non deterministic Topologies (#9064)
    
    Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   |  8 +++++
 .../streams/processor/internals/StreamTask.java    |  9 +++++
 .../processor/internals/StreamTaskTest.java        | 41 ++++++++++++++++++++++
 3 files changed, 58 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index bcfa745..62f7b94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -51,6 +51,14 @@ import java.util.regex.Pattern;
 /**
  * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.
  *
+ * <p>
+ * It is a requirement that the processing logic ({@link Topology}) be defined in a deterministic way,
+ * as in, the order in which all operators are added must be predictable and the same across all application
+ * instances.
+ * Topologies are only identical if all operators are added in the same order.
+ * If different {@link KafkaStreams} instances of the same application build different topologies the result may be
+ * incompatible runtime code and unexpected results or errors
+ *
  * @see Topology
  * @see KStream
  * @see KTable
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 271724c..e09f33c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -1109,6 +1110,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
         public RecordQueue createQueue(final TopicPartition partition) {
             final SourceNode<?, ?, ?, ?> source = topology.source(partition.topic());
+            if (source == null) {
+                throw new TopologyException(
+                        "Topic is unkown to the topology. " +
+                                "This may happen if different KafkaStreams instances of the same application execute different Topologies. " +
+                                "Note that Topologies are only identical if all operators are added in the same order."
+                );
+            }
+
             final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor();
             final TimestampExtractor timestampExtractor = sourceTimestampExtractor != null ? sourceTimestampExtractor : defaultTimestampExtractor;
             return new RecordQueue(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index d0c5804..229df65 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -46,6 +46,7 @@ import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
@@ -2032,6 +2033,46 @@ public class StreamTaskTest {
         assertThat(task.state(), equalTo(SUSPENDED));
     }
 
+    @Test
+    public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
+        final InternalProcessorContext context = new ProcessorContextImpl(
+                taskId,
+                createConfig(false, "100"),
+                stateManager,
+                streamsMetrics,
+                null
+        );
+        final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", StreamsConfig.METRICS_LATEST, time);
+        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        EasyMock.replay(stateManager);
+
+        // The processor topology is missing the topics
+        final ProcessorTopology topology = withSources(asList(), mkMap());
+
+        final TopologyException  exception = assertThrows(
+                TopologyException.class,
+                () -> new StreamTask(
+                        taskId,
+                        partitions,
+                        topology,
+                        consumer,
+                        createConfig(false, "100"),
+                        metrics,
+                        stateDirectory,
+                        cache,
+                        time,
+                        stateManager,
+                        recordCollector,
+                        context
+                )
+            );
+
+        assertThat(exception.getMessage(), equalTo("Invalid topology: " +
+                "Topic is unkown to the topology. This may happen if different KafkaStreams instances of the same " +
+                "application execute different Topologies. Note that Topologies are only identical if all operators " +
+                "are added in the same order."));
+    }
+
     private List<MetricName> getTaskMetrics() {
         return metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList());
     }