You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/09/30 19:06:52 UTC
[kafka] branch trunk updated: KAFKA-10205: Documentation and
handling of non deterministic Topologies (#9064)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d1c82a9 KAFKA-10205: Documentation and handling of non deterministic Topologies (#9064)
d1c82a9 is described below
commit d1c82a9baf5f3a0d3bde3ec35157358f7a62ecc8
Author: Igor Soarez <so...@apple.com>
AuthorDate: Wed Sep 30 20:06:11 2020 +0100
KAFKA-10205: Documentation and handling of non deterministic Topologies (#9064)
Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../org/apache/kafka/streams/StreamsBuilder.java | 8 +++++
.../streams/processor/internals/StreamTask.java | 9 +++++
.../processor/internals/StreamTaskTest.java | 41 ++++++++++++++++++++++
3 files changed, 58 insertions(+)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index bcfa745..62f7b94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -51,6 +51,14 @@ import java.util.regex.Pattern;
/**
* {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.
*
+ * <p>
+ * It is a requirement that the processing logic ({@link Topology}) be defined in a deterministic way,
+ * as in, the order in which all operators are added must be predictable and the same across all application
+ * instances.
+ * Topologies are only identical if all operators are added in the same order.
+ * If different {@link KafkaStreams} instances of the same application build different topologies the result may be
+ * incompatible runtime code and unexpected results or errors
+ *
* @see Topology
* @see KStream
* @see KTable
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 271724c..e09f33c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -1109,6 +1110,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
public RecordQueue createQueue(final TopicPartition partition) {
final SourceNode<?, ?, ?, ?> source = topology.source(partition.topic());
+ if (source == null) {
+ throw new TopologyException(
+ "Topic is unkown to the topology. " +
+ "This may happen if different KafkaStreams instances of the same application execute different Topologies. " +
+ "Note that Topologies are only identical if all operators are added in the same order."
+ );
+ }
+
final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor();
final TimestampExtractor timestampExtractor = sourceTimestampExtractor != null ? sourceTimestampExtractor : defaultTimestampExtractor;
return new RecordQueue(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index d0c5804..229df65 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -46,6 +46,7 @@ import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
@@ -2032,6 +2033,46 @@ public class StreamTaskTest {
assertThat(task.state(), equalTo(SUSPENDED));
}
+ @Test
+ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
+ final InternalProcessorContext context = new ProcessorContextImpl(
+ taskId,
+ createConfig(false, "100"),
+ stateManager,
+ streamsMetrics,
+ null
+ );
+ final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", StreamsConfig.METRICS_LATEST, time);
+ EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+ EasyMock.replay(stateManager);
+
+ // The processor topology is missing the topics
+ final ProcessorTopology topology = withSources(asList(), mkMap());
+
+ final TopologyException exception = assertThrows(
+ TopologyException.class,
+ () -> new StreamTask(
+ taskId,
+ partitions,
+ topology,
+ consumer,
+ createConfig(false, "100"),
+ metrics,
+ stateDirectory,
+ cache,
+ time,
+ stateManager,
+ recordCollector,
+ context
+ )
+ );
+
+ assertThat(exception.getMessage(), equalTo("Invalid topology: " +
+ "Topic is unkown to the topology. This may happen if different KafkaStreams instances of the same " +
+ "application execute different Topologies. Note that Topologies are only identical if all operators " +
+ "are added in the same order."));
+ }
+
private List<MetricName> getTaskMetrics() {
return metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList());
}