You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/15 03:27:22 UTC
[kafka] branch 2.0 updated: KAFKA-7055: Update
InternalTopologyBuilder to throw TopologyException if a processor or sink
is added with no upstream node attached (#5215)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 1bcd351 KAFKA-7055: Update InternalTopologyBuilder to throw TopologyException if a processor or sink is added with no upstream node attached (#5215)
1bcd351 is described below
commit 1bcd35183d8cd0b009bc1b8d592b5f24a0095890
Author: nixsticks <ni...@gmail.com>
AuthorDate: Thu Jun 14 23:26:01 2018 -0400
KAFKA-7055: Update InternalTopologyBuilder to throw TopologyException if a processor or sink is added with no upstream node attached (#5215)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
.../streams/kstream/internals/KStreamImpl.java | 1 -
.../internals/InternalTopologyBuilder.java | 28 +++++++--------
.../processor/internals/ProcessorContextImpl.java | 3 +-
.../org/apache/kafka/streams/TopologyTest.java | 41 +++++++++++++++++++++-
.../internals/InternalTopologyBuilderTest.java | 24 ++++++++++++-
.../processor/internals/StreamThreadTest.java | 4 +--
.../kafka/streams/TopologyTestDriverTest.java | 2 +-
7 files changed, 81 insertions(+), 22 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7356aff..e7dabbf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -580,7 +580,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name);
builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl) other).valueGetterSupplier().storeNames());
- builder.internalTopologyBuilder.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
return new KStreamImpl<>(builder, name, allSourceNodes, false);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 36a2edc..5b4b4d73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -442,6 +442,11 @@ public class InternalTopologyBuilder {
final String... predecessorNames) {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(topic, "topic must not be null");
+ Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
+ if (predecessorNames.length == 0) {
+ throw new TopologyException("Sink " + name + " must have at least one parent");
+ }
+
addSink(name, new StaticTopicNameExtractor<K, V>(topic), keySerializer, valSerializer, partitioner, predecessorNames);
nodeToSinkTopic.put(name, topic);
}
@@ -454,9 +459,13 @@ public class InternalTopologyBuilder {
final String... predecessorNames) {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(topicExtractor, "topic extractor must not be null");
+ Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
if (nodeFactories.containsKey(name)) {
throw new TopologyException("Processor " + name + " is already added.");
}
+ if (predecessorNames.length == 0) {
+ throw new TopologyException("Sink " + name + " must have at least one parent");
+ }
for (final String predecessor : predecessorNames) {
Objects.requireNonNull(predecessor, "predecessor name can't be null");
@@ -481,9 +490,13 @@ public class InternalTopologyBuilder {
final String... predecessorNames) {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(supplier, "supplier must not be null");
+ Objects.requireNonNull(predecessorNames, "predecessor names must not be null");
if (nodeFactories.containsKey(name)) {
throw new TopologyException("Processor " + name + " is already added.");
}
+ if (predecessorNames.length == 0) {
+ throw new TopologyException("Processor " + name + " must have at least one parent");
+ }
for (final String predecessor : predecessorNames) {
Objects.requireNonNull(predecessor, "predecessor name must not be null");
@@ -592,21 +605,6 @@ public class InternalTopologyBuilder {
storeToSourceChangelogTopic.put(storeBuilder, topic);
}
- public final void connectProcessors(final String... processorNames) {
- if (processorNames.length < 2) {
- throw new TopologyException("At least two processors need to participate in the connection.");
- }
-
- for (final String processorName : processorNames) {
- Objects.requireNonNull(processorName, "processor name can't be null");
- if (!nodeFactories.containsKey(processorName)) {
- throw new TopologyException("Processor " + processorName + " is not added yet.");
- }
- }
-
- nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length));
- }
-
public final void addInternalTopic(final String topicName) {
Objects.requireNonNull(topicName, "topicName can't be null");
internalTopicNames.add(topicName);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index a539a1b..f1ee81f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -116,7 +116,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
if (sendTo != null) {
final ProcessorNode child = currentNode().getChild(sendTo);
if (child == null) {
- throw new StreamsException("Unknown processor name: " + sendTo);
+ throw new StreamsException("Unknown downstream node: " + sendTo + " either does not exist or is not" +
+ " connected to this processor.");
}
forward(child, key, value);
} else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 8b47885..ece157c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -174,6 +174,24 @@ public class TopologyTest {
} catch (final TopologyException expected) { }
}
+ @Test
+ public void shouldNotAllowToAddProcessorWithEmptyParents() {
+ topology.addSource("source", "topic-1");
+ try {
+ topology.addProcessor("processor", new MockProcessorSupplier());
+ fail("Should throw TopologyException for processor without at least one parent node");
+ } catch (final TopologyException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowToAddProcessorWithNullParents() {
+ topology.addSource("source", "topic-1");
+ try {
+ topology.addProcessor("processor", new MockProcessorSupplier(), null);
+ fail("Should throw NullPointerException for processor when null parent names are provided");
+ } catch (final NullPointerException expected) { }
+ }
+
@Test(expected = TopologyException.class)
public void shouldFailOnUnknownSource() {
topology.addProcessor("processor", new MockProcessorSupplier(), "source");
@@ -194,6 +212,26 @@ public class TopologyTest {
} catch (final TopologyException expected) { }
}
+ @Test
+ public void shouldNotAllowToAddSinkWithEmptyParents() {
+ topology.addSource("source", "topic-1");
+ topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+ try {
+ topology.addSink("sink", "topic-2");
+ fail("Should throw TopologyException for sink without at least one parent node");
+ } catch (final TopologyException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowToAddSinkWithNullParents() {
+ topology.addSource("source", "topic-1");
+ topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+ try {
+ topology.addSink("sink", "topic-2", null);
+ fail("Should throw NullPointerException for sink when null parent names are provided");
+ } catch (final NullPointerException expected) { }
+ }
+
@Test(expected = TopologyException.class)
public void shouldFailWithUnknownParent() {
topology.addSink("sink", "topic-2", "source");
@@ -236,7 +274,8 @@ public class TopologyTest {
public void shouldNotAllowToAddStateStoreToSink() {
mockStoreBuilder();
EasyMock.replay(storeBuilder);
- topology.addSink("sink-1", "topic-1");
+ topology.addSource("source-1", "topic-1");
+ topology.addSink("sink-1", "topic-1", "source-1");
try {
topology.addStateStore(storeBuilder, "sink-1");
fail("Should have thrown TopologyException for adding store to sink node");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 1da0425..b0674ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -159,6 +159,16 @@ public class InternalTopologyBuilderTest {
builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
}
+ @Test(expected = TopologyException.class)
+ public void testAddProcessorWithEmptyParents() {
+ builder.addProcessor("processor", new MockProcessorSupplier());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testAddProcessorWithNullParents() {
+ builder.addProcessor("processor", new MockProcessorSupplier(), null);
+ }
+
@Test
public void testAddSinkWithSameName() {
builder.addSource(null, "source", null, null, null, "topic-1");
@@ -179,6 +189,17 @@ public class InternalTopologyBuilderTest {
builder.addSink("sink", "topic-2", null, null, null, "sink");
}
+
+ @Test(expected = TopologyException.class)
+ public void testAddSinkWithEmptyParents() {
+ builder.addSink("sink", "topic", null, null, null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testAddSinkWithNullParents() {
+ builder.addSink("sink", "topic", null, null, null, null);
+ }
+
@Test
public void testAddSinkConnectedWithParent() {
builder.addSource(null, "source", null, null, null, "source-topic");
@@ -275,7 +296,8 @@ public class InternalTopologyBuilderTest {
@Test
public void testAddStateStoreWithSink() {
- builder.addSink("sink-1", "topic-1", null, null, null);
+ builder.addSource(null, "source-1", null, null, null, "topic-1");
+ builder.addSink("sink-1", "topic-1", null, null, null, "source-1");
try {
builder.addStateStore(storeBuilder, "sink-1");
fail("Should throw TopologyException with store cannot be added to sink");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 3412c62..513d1c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -597,7 +597,7 @@ public class StreamThreadTest {
@Test
public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() {
internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
- internalTopologyBuilder.addSink("out", "output", null, null, null);
+ internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
final StreamThread thread = createStreamThread(clientId, config, false);
@@ -690,7 +690,7 @@ public class StreamThreadTest {
@Test
public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() {
internalTopologyBuilder.addSource(null, "name", null, null, null, topic1);
- internalTopologyBuilder.addSink("out", "output", null, null, null);
+ internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true);
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 135fb3f..d0d4ed1 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -707,7 +707,7 @@ public class TopologyTestDriverTest {
@Test
public void shouldReturnAllStores() {
final Topology topology = setupSourceSinkTopology();
- topology.addProcessor("processor", () -> null);
+ topology.addProcessor("processor", () -> null, "source");
topology.addStateStore(
new KeyValueStoreBuilder<>(
Stores.inMemoryKeyValueStore("store"),
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.