You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/15 03:27:22 UTC

[kafka] branch 2.0 updated: KAFKA-7055: Update InternalTopologyBuilder to throw TopologyException if a processor or sink is added with no upstream node attached (#5215)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 1bcd351  KAFKA-7055: Update InternalTopologyBuilder to throw TopologyException if a processor or sink is added with no upstream node attached (#5215)
1bcd351 is described below

commit 1bcd35183d8cd0b009bc1b8d592b5f24a0095890
Author: nixsticks <ni...@gmail.com>
AuthorDate: Thu Jun 14 23:26:01 2018 -0400

    KAFKA-7055: Update InternalTopologyBuilder to throw TopologyException if a processor or sink is added with no upstream node attached (#5215)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../streams/kstream/internals/KStreamImpl.java     |  1 -
 .../internals/InternalTopologyBuilder.java         | 28 +++++++--------
 .../processor/internals/ProcessorContextImpl.java  |  3 +-
 .../org/apache/kafka/streams/TopologyTest.java     | 41 +++++++++++++++++++++-
 .../internals/InternalTopologyBuilderTest.java     | 24 ++++++++++++-
 .../processor/internals/StreamThreadTest.java      |  4 +--
 .../kafka/streams/TopologyTestDriverTest.java      |  2 +-
 7 files changed, 81 insertions(+), 22 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7356aff..e7dabbf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -580,7 +580,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
         builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
         builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl) other).valueGetterSupplier().storeNames());
-        builder.internalTopologyBuilder.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
 
         return new KStreamImpl<>(builder, name, allSourceNodes, false);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 36a2edc..5b4b4d73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -442,6 +442,11 @@ public class InternalTopologyBuilder {
                                      final String... predecessorNames) {
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(topic, "topic must not be null");
+        Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
+        if (predecessorNames.length == 0) {
+            throw new TopologyException("Sink " + name + " must have at least one parent");
+        }
+
         addSink(name, new StaticTopicNameExtractor<K, V>(topic), keySerializer, valSerializer, partitioner, predecessorNames);
         nodeToSinkTopic.put(name, topic);
     }
@@ -454,9 +459,13 @@ public class InternalTopologyBuilder {
                                      final String... predecessorNames) {
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(topicExtractor, "topic extractor must not be null");
+        Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
         if (nodeFactories.containsKey(name)) {
             throw new TopologyException("Processor " + name + " is already added.");
         }
+        if (predecessorNames.length == 0) {
+            throw new TopologyException("Sink " + name + " must have at least one parent");
+        }
 
         for (final String predecessor : predecessorNames) {
             Objects.requireNonNull(predecessor, "predecessor name can't be null");
@@ -481,9 +490,13 @@ public class InternalTopologyBuilder {
                                    final String... predecessorNames) {
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(supplier, "supplier must not be null");
+        Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
         if (nodeFactories.containsKey(name)) {
             throw new TopologyException("Processor " + name + " is already added.");
         }
+        if (predecessorNames.length == 0) {
+            throw new TopologyException("Processor " + name + " must have at least one parent");
+        }
 
         for (final String predecessor : predecessorNames) {
             Objects.requireNonNull(predecessor, "predecessor name must not be null");
@@ -592,21 +605,6 @@ public class InternalTopologyBuilder {
         storeToSourceChangelogTopic.put(storeBuilder, topic);
     }
 
-    public final void connectProcessors(final String... processorNames) {
-        if (processorNames.length < 2) {
-            throw new TopologyException("At least two processors need to participate in the connection.");
-        }
-
-        for (final String processorName : processorNames) {
-            Objects.requireNonNull(processorName, "processor name can't be null");
-            if (!nodeFactories.containsKey(processorName)) {
-                throw new TopologyException("Processor " + processorName + " is not added yet.");
-            }
-        }
-
-        nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length));
-    }
-
     public final void addInternalTopic(final String topicName) {
         Objects.requireNonNull(topicName, "topicName can't be null");
         internalTopicNames.add(topicName);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index a539a1b..f1ee81f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -116,7 +116,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
             if (sendTo != null) {
                 final ProcessorNode child = currentNode().getChild(sendTo);
                 if (child == null) {
-                    throw new StreamsException("Unknown processor name: " + sendTo);
+                    throw new StreamsException("Unknown downstream node: " + sendTo + " either does not exist or is not" +
+                            " connected to this processor.");
                 }
                 forward(child, key, value);
             } else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 8b47885..ece157c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -174,6 +174,24 @@ public class TopologyTest {
         } catch (final TopologyException expected) { }
     }
 
+    @Test
+    public void shouldNotAllowToAddProcessorWithEmptyParents() {
+        topology.addSource("source", "topic-1");
+        try {
+            topology.addProcessor("processor", new MockProcessorSupplier());
+            fail("Should throw TopologyException for processor without at least one parent node");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowToAddProcessorWithNullParents() {
+        topology.addSource("source", "topic-1");
+        try {
+            topology.addProcessor("processor", new MockProcessorSupplier(), null);
+            fail("Should throw NullPointerException for processor when null parent names are provided");
+        } catch (final NullPointerException expected) { }
+    }
+
     @Test(expected = TopologyException.class)
     public void shouldFailOnUnknownSource() {
         topology.addProcessor("processor", new MockProcessorSupplier(), "source");
@@ -194,6 +212,26 @@ public class TopologyTest {
         } catch (final TopologyException expected) { }
     }
 
+    @Test
+    public void shouldNotAllowToAddSinkWithEmptyParents() {
+        topology.addSource("source", "topic-1");
+        topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+        try {
+            topology.addSink("sink", "topic-2");
+            fail("Should throw TopologyException for sink without at least one parent node");
+        } catch (final TopologyException expected) { }
+    }
+
+    @Test
+    public void shouldNotAllowToAddSinkWithNullParents() {
+        topology.addSource("source", "topic-1");
+        topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+        try {
+            topology.addSink("sink", "topic-2", null);
+            fail("Should throw NullPointerException for sink when null parent names are provided");
+        } catch (final NullPointerException expected) { }
+    }
+
     @Test(expected = TopologyException.class)
     public void shouldFailWithUnknownParent() {
         topology.addSink("sink", "topic-2", "source");
@@ -236,7 +274,8 @@ public class TopologyTest {
     public void shouldNotAllowToAddStateStoreToSink() {
         mockStoreBuilder();
         EasyMock.replay(storeBuilder);
-        topology.addSink("sink-1", "topic-1");
+        topology.addSource("source-1", "topic-1");
+        topology.addSink("sink-1", "topic-1", "source-1");
         try {
             topology.addStateStore(storeBuilder, "sink-1");
             fail("Should have thrown TopologyException for adding store to sink node");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 1da0425..b0674ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -159,6 +159,16 @@ public class InternalTopologyBuilderTest {
         builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
     }
 
+    @Test(expected = TopologyException.class)
+    public void testAddProcessorWithEmptyParents() {
+        builder.addProcessor("processor", new MockProcessorSupplier());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testAddProcessorWithNullParents() {
+        builder.addProcessor("processor", new MockProcessorSupplier(), null);
+    }
+
     @Test
     public void testAddSinkWithSameName() {
         builder.addSource(null, "source", null, null, null, "topic-1");
@@ -179,6 +189,17 @@ public class InternalTopologyBuilderTest {
         builder.addSink("sink", "topic-2", null, null, null, "sink");
     }
 
+
+    @Test(expected = TopologyException.class)
+    public void testAddSinkWithEmptyParents() {
+        builder.addSink("sink", "topic", null, null, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testAddSinkWithNullParents() {
+        builder.addSink("sink", "topic", null, null, null, null);
+    }
+
     @Test
     public void testAddSinkConnectedWithParent() {
         builder.addSource(null, "source", null, null, null, "source-topic");
@@ -275,7 +296,8 @@ public class InternalTopologyBuilderTest {
 
     @Test
     public void testAddStateStoreWithSink() {
-        builder.addSink("sink-1", "topic-1", null, null, null);
+        builder.addSource(null, "source-1", null, null, null, "topic-1");
+        builder.addSink("sink-1", "topic-1", null, null, null, "source-1");
         try {
             builder.addStateStore(storeBuilder, "sink-1");
             fail("Should throw TopologyException with store cannot be added to sink");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 3412c62..513d1c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -597,7 +597,7 @@ public class StreamThreadTest {
     @Test
     public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() {
         internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
-        internalTopologyBuilder.addSink("out", "output", null, null, null);
+        internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
 
         final StreamThread thread = createStreamThread(clientId, config, false);
 
@@ -690,7 +690,7 @@ public class StreamThreadTest {
     @Test
     public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() {
         internalTopologyBuilder.addSource(null, "name", null, null, null, topic1);
-        internalTopologyBuilder.addSink("out", "output", null, null, null);
+        internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
 
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true);
 
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 135fb3f..d0d4ed1 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -707,7 +707,7 @@ public class TopologyTestDriverTest {
     @Test
     public void shouldReturnAllStores() {
         final Topology topology = setupSourceSinkTopology();
-        topology.addProcessor("processor", () -> null);
+        topology.addProcessor("processor", () -> null, "source");
         topology.addStateStore(
             new KeyValueStoreBuilder<>(
                 Stores.inMemoryKeyValueStore("store"),

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.