You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/28 23:46:38 UTC
[1/3] kafka git commit: KAFKA-5670: (KIP-120) Add Topology and
deprecate TopologyBuilder
Repository: kafka
Updated Branches:
refs/heads/trunk c50c941af -> 1844bf2b2
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
new file mode 100644
index 0000000..c163935
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+public class TopologyTest {
+
+ private final Topology topology = new Topology();
+ private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullNameWhenAddingSourceWithTopic() {
+ topology.addSource((String) null, "topic");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullNameWhenAddingSourceWithPattern() {
+ topology.addSource(null, Pattern.compile(".*"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTopicsWhenAddingSoureWithTopic() {
+ topology.addSource("source", (String[]) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTopicsWhenAddingSourceWithPattern() {
+ topology.addSource("source", (Pattern) null);
+ }
+
+ @Test(expected = TopologyException.class)
+ public void shouldNotAllowZeroTopicsWhenAddingSource() {
+ topology.addSource("source");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullNameWhenAddingProcessor() {
+ topology.addProcessor(null, new ProcessorSupplier() {
+ @Override
+ public Processor get() {
+ return new MockProcessorSupplier().get();
+ }
+ });
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
+ topology.addProcessor("name", null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullNameWhenAddingSink() {
+ topology.addSink(null, "topic");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullTopicWhenAddingSink() {
+ topology.addSink("name", null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
+ topology.connectProcessorAndStateStores(null, "store");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAllowNullStoreNameWhenConnectingProcessorAndStateStores() {
+ topology.connectProcessorAndStateStores("processor", (String[]) null);
+ }
+
+ @Test(expected = TopologyException.class)
+ public void shouldNotAllowZeroStoreNameWhenConnectingProcessorAndStateStores() {
+ topology.connectProcessorAndStateStores("processor");
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotAddNullStateStoreSupplier() {
+ topology.addStateStore(null);
+ }
+
+ @Test
+ public void shouldNotAllowToAddSourcesWithSameName() {
+ topology.addSource("source", "topic-1");
+ try {
+ topology.addSource("source", "topic-2");
+ fail("Should throw TopologyException for duplicate source name");
+ } catch (TopologyException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowToAddTopicTwice() {
+ topology.addSource("source", "topic-1");
+ try {
+ topology.addSource("source-2", "topic-1");
+ fail("Should throw TopologyException for already used topic");
+ } catch (TopologyException expected) { }
+ }
+
+ @Test
+ public void testPatternMatchesAlreadyProvidedTopicSource() {
+ topology.addSource("source-1", "foo");
+ try {
+ topology.addSource("source-2", Pattern.compile("f.*"));
+ fail("Should have thrown TopologyException for overlapping pattern with already registered topic");
+ } catch (final TopologyException expected) { }
+ }
+
+ @Test
+ public void testNamedTopicMatchesAlreadyProvidedPattern() {
+ topology.addSource("source-1", Pattern.compile("f.*"));
+ try {
+ topology.addSource("source-2", "foo");
+ fail("Should have thrown TopologyException for overlapping topic with already registered pattern");
+ } catch (final TopologyException expected) { }
+ }
+
+ @Test
+ public void shoudNotAllowToAddProcessorWithSameName() {
+ topology.addSource("source", "topic-1");
+ topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+ try {
+ topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+ fail("Should throw TopologyException for duplicate processor name");
+ } catch (TopologyException expected) { }
+ }
+
+ @Test(expected = TopologyException.class)
+ public void shouldFailOnUnknownSource() {
+ topology.addProcessor("processor", new MockProcessorSupplier(), "source");
+ }
+
+ @Test(expected = TopologyException.class)
+ public void shouldFailIfNodeIsItsOwnParent() {
+ topology.addProcessor("processor", new MockProcessorSupplier(), "processor");
+ }
+
+ @Test
+ public void shouldNotAllowToAddSinkWithSameName() {
+ topology.addSource("source", "topic-1");
+ topology.addSink("sink", "topic-2", "source");
+ try {
+ topology.addSink("sink", "topic-3", "source");
+ fail("Should throw TopologyException for duplicate sink name");
+ } catch (TopologyException expected) { }
+ }
+
+ @Test(expected = TopologyException.class)
+ public void shouldFailWithUnknownParent() {
+ topology.addSink("sink", "topic-2", "source");
+ }
+
+ @Test(expected = TopologyException.class)
+ public void shouldFailIfSinkIsItsOwnParent() {
+ topology.addSink("sink", "topic-2", "sink");
+ }
+
+ @Test
+ public void shouldFailIfSinkIsParent() {
+ topology.addSource("source", "topic-1");
+ topology.addSink("sink-1", "topic-2", "source");
+ try {
+ topology.addSink("sink-2", "topic-3", "sink-1");
+ fail("Should throw TopologyException for using sink as parent");
+ } catch (final TopologyException expected) { }
+ }
+
+ @Test(expected = TopologyException.class)
+ public void shouldNotAllowToAddStateStoreToNonExistingProcessor() {
+ topology.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
+ }
+
+ @Test
+ public void shouldNotAllowToAddStateStoreToSource() {
+ topology.addSource("source-1", "topic-1");
+ try {
+ topology.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
+ fail("Should have thrown TopologyException for adding store to source node");
+ } catch (final TopologyException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowToAddStateStoreToSink() {
+ topology.addSink("sink-1", "topic-1");
+ try {
+ topology.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
+ fail("Should have thrown TopologyException for adding store to sink node");
+ } catch (final TopologyException expected) { }
+ }
+
+ @Test
+ public void shouldNotAllowToAddStoreWithSameName() {
+ topology.addStateStore(new MockStateStoreSupplier("store", false));
+ try {
+ topology.addStateStore(new MockStateStoreSupplier("store", false));
+ fail("Should have thrown TopologyException for duplicate store name");
+ } catch (final TopologyException expected) { }
+ }
+
+ @Test(expected = TopologyBuilderException.class)
+ public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
+ final String sourceNodeName = "source";
+ final String goodNodeName = "goodGuy";
+ final String badNodeName = "badGuy";
+
+ final Properties config = new Properties();
+ config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+ final StreamsConfig streamsConfig = new StreamsConfig(config);
+
+ topology
+ .addSource(sourceNodeName, "topic")
+ .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
+ .addStateStore(
+ Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+ goodNodeName)
+ .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+
+ try {
+ new ProcessorTopologyTestDriver(streamsConfig, topology.internalTopologyBuilder);
+ } catch (final StreamsException e) {
+ final Throwable cause = e.getCause();
+ if (cause != null
+ && cause instanceof TopologyBuilderException
+ && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
+ throw (TopologyBuilderException) cause;
+ } else {
+ throw new RuntimeException("Did expect different exception. Did catch:", e);
+ }
+ }
+ }
+
+ private static class LocalMockProcessorSupplier implements ProcessorSupplier {
+ final static String STORE_NAME = "store";
+
+ @Override
+ public Processor get() {
+ return new Processor() {
+ @Override
+ public void init(ProcessorContext context) {
+ context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(Object key, Object value) { }
+
+ @Override
+ public void punctuate(long timestamp) { }
+
+ @Override
+ public void close() { }
+ };
+ }
+ }
+
+ @Test(expected = TopologyException.class)
+ public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
+ topology.addGlobalStore(
+ new MockStateStoreSupplier("anyName", false, false),
+ "sameName",
+ null,
+ null,
+ "anyTopicName",
+ "sameName",
+ new MockProcessorSupplier());
+ }
+
+ @Test
+ public void shouldDescribeEmptyTopology() {
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void singleSourceShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+
+ expectedDescription.addSubtopology(
+ new InternalTopologyBuilder.Subtopology(0,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic1", "topic2", "topic3");
+
+ expectedDescription.addSubtopology(
+ new InternalTopologyBuilder.Subtopology(0,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void singleSourcePatternShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", Pattern.compile("topic[0-9]"));
+
+ expectedDescription.addSubtopology(
+ new InternalTopologyBuilder.Subtopology(0,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void multipleSourcesShouldHaveDistinctSubtopologies() {
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+ expectedDescription.addSubtopology(
+ new InternalTopologyBuilder.Subtopology(0,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode1)));
+
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+ expectedDescription.addSubtopology(
+ new InternalTopologyBuilder.Subtopology(1,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode2)));
+
+ final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+ expectedDescription.addSubtopology(
+ new InternalTopologyBuilder.Subtopology(2,
+ Collections.<TopologyDescription.Node>singleton(expectedSourceNode3)));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void sourceAndProcessorShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+ final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode);
+ allNodes.add(expectedProcessorNode);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+ final String[] store = new String[] {"store"};
+ final TopologyDescription.Processor expectedProcessorNode
+ = addProcessorWithNewStore("processor", store, expectedSourceNode);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode);
+ allNodes.add(expectedProcessorNode);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+
+ @Test
+ public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+ final String[] stores = new String[] {"store1", "store2"};
+ final TopologyDescription.Processor expectedProcessorNode
+ = addProcessorWithNewStore("processor", stores, expectedSourceNode);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode);
+ allNodes.add(expectedProcessorNode);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
+ final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode);
+ final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode);
+ allNodes.add(expectedProcessorNode1);
+ allNodes.add(expectedProcessorNode2);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic0");
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", Pattern.compile("topic[1-9]"));
+ final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode1, expectedSourceNode2);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode1);
+ allNodes.add(expectedSourceNode2);
+ allNodes.add(expectedProcessorNode);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+ final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
+
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+ final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
+
+ final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+ final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
+
+ final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
+ allNodes1.add(expectedSourceNode1);
+ allNodes1.add(expectedProcessorNode1);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
+
+ final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
+ allNodes2.add(expectedSourceNode2);
+ allNodes2.add(expectedProcessorNode2);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
+
+ final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
+ allNodes3.add(expectedSourceNode3);
+ allNodes3.add(expectedProcessorNode3);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
+ final TopologyDescription.Sink expectedSinkNode1 = addSink("sink1", "sinkTopic1", expectedSourceNode1);
+
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+ final TopologyDescription.Sink expectedSinkNode2 = addSink("sink2", "sinkTopic2", expectedSourceNode2);
+
+ final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+ final TopologyDescription.Sink expectedSinkNode3 = addSink("sink3", "sinkTopic3", expectedSourceNode3);
+
+ final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
+ allNodes1.add(expectedSourceNode1);
+ allNodes1.add(expectedSinkNode1);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
+
+ final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
+ allNodes2.add(expectedSourceNode2);
+ allNodes2.add(expectedSinkNode2);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
+
+ final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
+ allNodes3.add(expectedSourceNode3);
+ allNodes3.add(expectedSinkNode3);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void processorsWithSameSinkShouldHaveSameSubtopology() {
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
+ final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
+
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+ final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
+
+ final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+ final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
+
+ final TopologyDescription.Sink expectedSinkNode = addSink(
+ "sink",
+ "sinkTopic",
+ expectedProcessorNode1,
+ expectedProcessorNode2,
+ expectedProcessorNode3);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode1);
+ allNodes.add(expectedProcessorNode1);
+ allNodes.add(expectedSourceNode2);
+ allNodes.add(expectedProcessorNode2);
+ allNodes.add(expectedSourceNode3);
+ allNodes.add(expectedProcessorNode3);
+ allNodes.add(expectedSinkNode);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void processorsWithSharedStateShouldHaveSameSubtopology() {
+ final String[] store1 = new String[] {"store1"};
+ final String[] store2 = new String[] {"store2"};
+ final String[] bothStores = new String[] {store1[0], store2[0]};
+
+ final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
+ final TopologyDescription.Processor expectedProcessorNode1
+ = addProcessorWithNewStore("processor1", store1, expectedSourceNode1);
+
+ final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
+ final TopologyDescription.Processor expectedProcessorNode2
+ = addProcessorWithNewStore("processor2", store2, expectedSourceNode2);
+
+ final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
+ final TopologyDescription.Processor expectedProcessorNode3
+ = addProcessorWithExistingStore("processor3", bothStores, expectedSourceNode3);
+
+ final Set<TopologyDescription.Node> allNodes = new HashSet<>();
+ allNodes.add(expectedSourceNode1);
+ allNodes.add(expectedProcessorNode1);
+ allNodes.add(expectedSourceNode2);
+ allNodes.add(expectedProcessorNode2);
+ allNodes.add(expectedSourceNode3);
+ allNodes.add(expectedProcessorNode3);
+ expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
+
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void shouldDescribeGlobalStoreTopology() {
+ addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor");
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ @Test
+ public void shouldDescribeMultipleGlobalStoreTopology() {
+ addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1");
+ addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2");
+ assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
+ }
+
+ private TopologyDescription.Source addSource(final String sourceName,
+ final String... sourceTopic) {
+ topology.addSource(null, sourceName, null, null, null, sourceTopic);
+ String allSourceTopics = sourceTopic[0];
+ for (int i = 1; i < sourceTopic.length; ++i) {
+ allSourceTopics += ", " + sourceTopic[i];
+ }
+ return new InternalTopologyBuilder.Source(sourceName, allSourceTopics);
+ }
+
+ private TopologyDescription.Source addSource(final String sourceName,
+ final Pattern sourcePattern) {
+ topology.addSource(null, sourceName, null, null, null, sourcePattern);
+ return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString());
+ }
+
+ private TopologyDescription.Processor addProcessor(final String processorName,
+ final TopologyDescription.Node... parents) {
+ return addProcessorWithNewStore(processorName, new String[0], parents);
+ }
+
+ private TopologyDescription.Processor addProcessorWithNewStore(final String processorName,
+ final String[] storeNames,
+ final TopologyDescription.Node... parents) {
+ return addProcessorWithStore(processorName, storeNames, true, parents);
+ }
+
+ private TopologyDescription.Processor addProcessorWithExistingStore(final String processorName,
+ final String[] storeNames,
+ final TopologyDescription.Node... parents) {
+ return addProcessorWithStore(processorName, storeNames, false, parents);
+ }
+
+ private TopologyDescription.Processor addProcessorWithStore(final String processorName,
+ final String[] storeNames,
+ final boolean newStores,
+ final TopologyDescription.Node... parents) {
+ final String[] parentNames = new String[parents.length];
+ for (int i = 0; i < parents.length; ++i) {
+ parentNames[i] = parents[i].name();
+ }
+
+ topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
+ if (newStores) {
+ for (final String store : storeNames) {
+ topology.addStateStore(new MockStateStoreSupplier(store, false), processorName);
+ }
+ } else {
+ topology.connectProcessorAndStateStores(processorName, storeNames);
+ }
+ final TopologyDescription.Processor expectedProcessorNode
+ = new InternalTopologyBuilder.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
+
+ for (final TopologyDescription.Node parent : parents) {
+ ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedProcessorNode);
+ ((InternalTopologyBuilder.AbstractNode) expectedProcessorNode).addPredecessor(parent);
+ }
+
+ return expectedProcessorNode;
+ }
+
+ private TopologyDescription.Sink addSink(final String sinkName,
+ final String sinkTopic,
+ final TopologyDescription.Node... parents) {
+ final String[] parentNames = new String[parents.length];
+ for (int i = 0; i < parents.length; ++i) {
+ parentNames[i] = parents[i].name();
+ }
+
+ topology.addSink(sinkName, sinkTopic, null, null, null, parentNames);
+ final TopologyDescription.Sink expectedSinkNode
+ = new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
+
+ for (final TopologyDescription.Node parent : parents) {
+ ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedSinkNode);
+ ((InternalTopologyBuilder.AbstractNode) expectedSinkNode).addPredecessor(parent);
+ }
+
+ return expectedSinkNode;
+ }
+
+ private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
+ final String sourceName,
+ final String globalTopicName,
+ final String processorName) {
+ topology.addGlobalStore(
+ new MockStateStoreSupplier(globalStoreName, false, false),
+ sourceName,
+ null,
+ null,
+ null,
+ globalTopicName,
+ processorName,
+ new MockProcessorSupplier());
+
+ final TopologyDescription.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore(
+ sourceName,
+ processorName,
+ globalStoreName,
+ globalTopicName);
+
+ expectedDescription.addGlobalStore(expectedGlobalStore);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index a868839..3fecfc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
@@ -56,6 +57,7 @@ import java.util.regex.Pattern;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
@Category({IntegrationTest.class})
public class KStreamsFineGrainedAutoResetIntegrationTest {
@@ -238,26 +240,29 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
consumer.close();
}
- @Test(expected = TopologyBuilderException.class)
+ @Test
public void shouldThrowExceptionOverlappingPattern() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
//NOTE this would realistically get caught when building topology, the test is for completeness
builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1"));
- builder.stream(TOPIC_Y_1, TOPIC_Z_1);
- builder.earliestResetTopicsPattern();
+ try {
+ builder.earliestResetTopicsPattern();
+ fail("Should have thrown TopologyException");
+ } catch (final TopologyException expected) { }
}
- @Test(expected = TopologyBuilderException.class)
+ @Test
public void shouldThrowExceptionOverlappingTopic() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
//NOTE this would realistically get caught when building topology, the test is for completeness
builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
- builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d_1"));
- builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
- builder.latestResetTopicsPattern();
+ try {
+ builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
+ fail("Should have thrown TopologyBuilderException");
+ } catch (final TopologyBuilderException expected) { }
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index a7ddb7b..0cd2f2a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -57,9 +57,9 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@SuppressWarnings("deprecation")
public class TopologyBuilderTest {
-
@Test
public void shouldAddSourceWithOffsetReset() {
final TopologyBuilder builder = new TopologyBuilder();
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
deleted file mode 100644
index a541eb3..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.streams.TopologyDescription;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-// TODO (remove this comment) Test name ok, we just use InternalTopologyBuilder for now in this test until Topology gets added
-public class TopologyTest {
- // TODO change from InternalTopologyBuilder to Topology
- private final InternalTopologyBuilder topology = new InternalTopologyBuilder();
- private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription();
-
- @Test
- public void shouldDescribeEmptyTopology() {
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void singleSourceShouldHaveSingleSubtopology() {
- final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
-
- expectedDescription.addSubtopology(
- new InternalTopologyBuilder.Subtopology(0,
- Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
- final TopologyDescription.Source expectedSourceNode = addSource("source", "topic1", "topic2", "topic3");
-
- expectedDescription.addSubtopology(
- new InternalTopologyBuilder.Subtopology(0,
- Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void singleSourcePatternShouldHaveSingleSubtopology() {
- final TopologyDescription.Source expectedSourceNode = addSource("source", Pattern.compile("topic[0-9]"));
-
- expectedDescription.addSubtopology(
- new InternalTopologyBuilder.Subtopology(0,
- Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void multipleSourcesShouldHaveDistinctSubtopologies() {
- final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
- expectedDescription.addSubtopology(
- new InternalTopologyBuilder.Subtopology(0,
- Collections.<TopologyDescription.Node>singleton(expectedSourceNode1)));
-
- final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
- expectedDescription.addSubtopology(
- new InternalTopologyBuilder.Subtopology(1,
- Collections.<TopologyDescription.Node>singleton(expectedSourceNode2)));
-
- final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
- expectedDescription.addSubtopology(
- new InternalTopologyBuilder.Subtopology(2,
- Collections.<TopologyDescription.Node>singleton(expectedSourceNode3)));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void sourceAndProcessorShouldHaveSingleSubtopology() {
- final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
- final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode);
-
- final Set<TopologyDescription.Node> allNodes = new HashSet<>();
- allNodes.add(expectedSourceNode);
- allNodes.add(expectedProcessorNode);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
- final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
- final String[] store = new String[] {"store"};
- final TopologyDescription.Processor expectedProcessorNode
- = addProcessorWithNewStore("processor", store, expectedSourceNode);
-
- final Set<TopologyDescription.Node> allNodes = new HashSet<>();
- allNodes.add(expectedSourceNode);
- allNodes.add(expectedProcessorNode);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
-
- @Test
- public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
- final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
- final String[] stores = new String[] {"store1", "store2"};
- final TopologyDescription.Processor expectedProcessorNode
- = addProcessorWithNewStore("processor", stores, expectedSourceNode);
-
- final Set<TopologyDescription.Node> allNodes = new HashSet<>();
- allNodes.add(expectedSourceNode);
- allNodes.add(expectedProcessorNode);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
- final TopologyDescription.Source expectedSourceNode = addSource("source", "topic");
- final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode);
- final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode);
-
- final Set<TopologyDescription.Node> allNodes = new HashSet<>();
- allNodes.add(expectedSourceNode);
- allNodes.add(expectedProcessorNode1);
- allNodes.add(expectedProcessorNode2);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void processorWithMultipleSourcesShouldHaveSingleSubtopology() {
- final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic0");
- final TopologyDescription.Source expectedSourceNode2 = addSource("source2", Pattern.compile("topic[1-9]"));
- final TopologyDescription.Processor expectedProcessorNode = addProcessor("processor", expectedSourceNode1, expectedSourceNode2);
-
- final Set<TopologyDescription.Node> allNodes = new HashSet<>();
- allNodes.add(expectedSourceNode1);
- allNodes.add(expectedSourceNode2);
- allNodes.add(expectedProcessorNode);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
- final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
- final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
-
- final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
- final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
-
- final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
- final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
-
- final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
- allNodes1.add(expectedSourceNode1);
- allNodes1.add(expectedProcessorNode1);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
-
- final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
- allNodes2.add(expectedSourceNode2);
- allNodes2.add(expectedProcessorNode2);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
-
- final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
- allNodes3.add(expectedSourceNode3);
- allNodes3.add(expectedProcessorNode3);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
- final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1");
- final TopologyDescription.Sink expectedSinkNode1 = addSink("sink1", "sinkTopic1", expectedSourceNode1);
-
- final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
- final TopologyDescription.Sink expectedSinkNode2 = addSink("sink2", "sinkTopic2", expectedSourceNode2);
-
- final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
- final TopologyDescription.Sink expectedSinkNode3 = addSink("sink3", "sinkTopic3", expectedSourceNode3);
-
- final Set<TopologyDescription.Node> allNodes1 = new HashSet<>();
- allNodes1.add(expectedSourceNode1);
- allNodes1.add(expectedSinkNode1);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1));
-
- final Set<TopologyDescription.Node> allNodes2 = new HashSet<>();
- allNodes2.add(expectedSourceNode2);
- allNodes2.add(expectedSinkNode2);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2));
-
- final Set<TopologyDescription.Node> allNodes3 = new HashSet<>();
- allNodes3.add(expectedSourceNode3);
- allNodes3.add(expectedSinkNode3);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void processorsWithSameSinkShouldHaveSameSubtopology() {
- final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
- final TopologyDescription.Processor expectedProcessorNode1 = addProcessor("processor1", expectedSourceNode1);
-
- final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
- final TopologyDescription.Processor expectedProcessorNode2 = addProcessor("processor2", expectedSourceNode2);
-
- final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
- final TopologyDescription.Processor expectedProcessorNode3 = addProcessor("processor3", expectedSourceNode3);
-
- final TopologyDescription.Sink expectedSinkNode = addSink(
- "sink",
- "sinkTopic",
- expectedProcessorNode1,
- expectedProcessorNode2,
- expectedProcessorNode3);
-
- final Set<TopologyDescription.Node> allNodes = new HashSet<>();
- allNodes.add(expectedSourceNode1);
- allNodes.add(expectedProcessorNode1);
- allNodes.add(expectedSourceNode2);
- allNodes.add(expectedProcessorNode2);
- allNodes.add(expectedSourceNode3);
- allNodes.add(expectedProcessorNode3);
- allNodes.add(expectedSinkNode);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void processorsWithSharedStateShouldHaveSameSubtopology() {
- final String[] store1 = new String[] {"store1"};
- final String[] store2 = new String[] {"store2"};
- final String[] bothStores = new String[] {store1[0], store2[0]};
-
- final TopologyDescription.Source expectedSourceNode1 = addSource("source", "topic");
- final TopologyDescription.Processor expectedProcessorNode1
- = addProcessorWithNewStore("processor1", store1, expectedSourceNode1);
-
- final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2");
- final TopologyDescription.Processor expectedProcessorNode2
- = addProcessorWithNewStore("processor2", store2, expectedSourceNode2);
-
- final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3");
- final TopologyDescription.Processor expectedProcessorNode3
- = addProcessorWithExistingStore("processor3", bothStores, expectedSourceNode3);
-
- final Set<TopologyDescription.Node> allNodes = new HashSet<>();
- allNodes.add(expectedSourceNode1);
- allNodes.add(expectedProcessorNode1);
- allNodes.add(expectedSourceNode2);
- allNodes.add(expectedProcessorNode2);
- allNodes.add(expectedSourceNode3);
- allNodes.add(expectedProcessorNode3);
- expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes));
-
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void shouldDescribeGlobalStoreTopology() {
- addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor");
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- @Test
- public void shouldDescribeMultipleGlobalStoreTopology() {
- addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1");
- addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2");
- assertThat(topology.describe(), equalTo(expectedDescription));
- }
-
- private TopologyDescription.Source addSource(final String sourceName,
- final String... sourceTopic) {
- topology.addSource(null, sourceName, null, null, null, sourceTopic);
- String allSourceTopics = sourceTopic[0];
- for (int i = 1; i < sourceTopic.length; ++i) {
- allSourceTopics += ", " + sourceTopic[i];
- }
- return new InternalTopologyBuilder.Source(sourceName, allSourceTopics);
- }
-
- private TopologyDescription.Source addSource(final String sourceName,
- final Pattern sourcePattern) {
- topology.addSource(null, sourceName, null, null, null, sourcePattern);
- return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString());
- }
-
- private TopologyDescription.Processor addProcessor(final String processorName,
- final TopologyDescription.Node... parents) {
- return addProcessorWithNewStore(processorName, new String[0], parents);
- }
-
- private TopologyDescription.Processor addProcessorWithNewStore(final String processorName,
- final String[] storeNames,
- final TopologyDescription.Node... parents) {
- return addProcessorWithStore(processorName, storeNames, true, parents);
- }
-
- private TopologyDescription.Processor addProcessorWithExistingStore(final String processorName,
- final String[] storeNames,
- final TopologyDescription.Node... parents) {
- return addProcessorWithStore(processorName, storeNames, false, parents);
- }
-
- private TopologyDescription.Processor addProcessorWithStore(final String processorName,
- final String[] storeNames,
- final boolean newStores,
- final TopologyDescription.Node... parents) {
- final String[] parentNames = new String[parents.length];
- for (int i = 0; i < parents.length; ++i) {
- parentNames[i] = parents[i].name();
- }
-
- topology.addProcessor(processorName, new MockProcessorSupplier(), parentNames);
- if (newStores) {
- for (final String store : storeNames) {
- topology.addStateStore(new MockStateStoreSupplier(store, false), processorName);
- }
- } else {
- topology.connectProcessorAndStateStores(processorName, storeNames);
- }
- final TopologyDescription.Processor expectedProcessorNode
- = new InternalTopologyBuilder.Processor(processorName, new HashSet<>(Arrays.asList(storeNames)));
-
- for (final TopologyDescription.Node parent : parents) {
- ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedProcessorNode);
- ((InternalTopologyBuilder.AbstractNode) expectedProcessorNode).addPredecessor(parent);
- }
-
- return expectedProcessorNode;
- }
-
- private TopologyDescription.Sink addSink(final String sinkName,
- final String sinkTopic,
- final TopologyDescription.Node... parents) {
- final String[] parentNames = new String[parents.length];
- for (int i = 0; i < parents.length; ++i) {
- parentNames[i] = parents[i].name();
- }
-
- topology.addSink(sinkName, sinkTopic, null, null, null, parentNames);
- final TopologyDescription.Sink expectedSinkNode
- = new InternalTopologyBuilder.Sink(sinkName, sinkTopic);
-
- for (final TopologyDescription.Node parent : parents) {
- ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedSinkNode);
- ((InternalTopologyBuilder.AbstractNode) expectedSinkNode).addPredecessor(parent);
- }
-
- return expectedSinkNode;
- }
-
- private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
- final String sourceName,
- final String globalTopicName,
- final String processorName) {
- topology.addGlobalStore(
- new MockStateStoreSupplier(globalStoreName, false, false),
- sourceName,
- null,
- null,
- null,
- globalTopicName,
- processorName,
- new MockProcessorSupplier());
-
- final TopologyDescription.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore(
- sourceName,
- processorName,
- globalStoreName,
- globalTopicName);
-
- expectedDescription.addGlobalStore(expectedGlobalStore);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index b98b756..9bd8756 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -19,14 +19,15 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
@@ -67,8 +68,8 @@ public class InternalTopologyBuilderTest {
final String earliestTopic = "earliestTopic";
final String latestTopic = "latestTopic";
- builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, null, null, earliestTopic);
- builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", null, null, null, latestTopic);
+ builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, null, null, earliestTopic);
+ builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, null, null, latestTopic);
assertTrue(builder.earliestResetTopicsPattern().matcher(earliestTopic).matches());
assertTrue(builder.latestResetTopicsPattern().matcher(latestTopic).matches());
@@ -79,8 +80,8 @@ public class InternalTopologyBuilderTest {
final String earliestTopicPattern = "earliest.*Topic";
final String latestTopicPattern = "latest.*Topic";
- builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, null, null, Pattern.compile(earliestTopicPattern));
- builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", null, null, null, Pattern.compile(latestTopicPattern));
+ builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, null, null, Pattern.compile(earliestTopicPattern));
+ builder.addSource(Topology.AutoOffsetReset.LATEST, "source2", null, null, null, Pattern.compile(latestTopicPattern));
assertTrue(builder.earliestResetTopicsPattern().matcher("earliestTestTopic").matches());
assertTrue(builder.latestResetTopicsPattern().matcher("latestTestTopic").matches());
@@ -108,18 +109,18 @@ public class InternalTopologyBuilderTest {
assertEquals(builder.latestResetTopicsPattern().pattern(), "");
}
- @Test(expected = TopologyBuilderException.class)
+ @Test(expected = TopologyException.class)
public void shouldNotAllowOffsetResetSourceWithoutTopics() {
- builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
+ builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
}
@Test
public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
- builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
+ builder.addSource(Topology.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
try {
- builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
- fail("Should throw TopologyBuilderException for duplicate source name");
- } catch (final TopologyBuilderException expected) { /* ok */ }
+ builder.addSource(Topology.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
+ fail("Should throw TopologyException for duplicate source name");
+ } catch (final TopologyException expected) { /* ok */ }
}
@Test
@@ -127,8 +128,8 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source", null, null, null, "topic-1");
try {
builder.addSource(null, "source", null, null, null, "topic-2");
- fail("Should throw TopologyBuilderException with source name conflict");
- } catch (final TopologyBuilderException expected) { /* ok */ }
+ fail("Should throw TopologyException with source name conflict");
+ } catch (final TopologyException expected) { /* ok */ }
}
@Test
@@ -136,8 +137,8 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source", null, null, null, "topic-1");
try {
builder.addSource(null, "source-2", null, null, null, "topic-1");
- fail("Should throw TopologyBuilderException with topic conflict");
- } catch (final TopologyBuilderException expected) { /* ok */ }
+ fail("Should throw TopologyException with topic conflict");
+ } catch (final TopologyException expected) { /* ok */ }
}
@Test
@@ -146,16 +147,16 @@ public class InternalTopologyBuilderTest {
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
try {
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- fail("Should throw TopologyBuilderException with processor name conflict");
- } catch (final TopologyBuilderException expected) { /* ok */ }
+ fail("Should throw TopologyException with processor name conflict");
+ } catch (final TopologyException expected) { /* ok */ }
}
- @Test(expected = TopologyBuilderException.class)
+ @Test(expected = TopologyException.class)
public void testAddProcessorWithWrongParent() {
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
}
- @Test(expected = TopologyBuilderException.class)
+ @Test(expected = TopologyException.class)
public void testAddProcessorWithSelfParent() {
builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
}
@@ -166,16 +167,16 @@ public class InternalTopologyBuilderTest {
builder.addSink("sink", "topic-2", null, null, null, "source");
try {
builder.addSink("sink", "topic-3", null, null, null, "source");
- fail("Should throw TopologyBuilderException with sink name conflict");
- } catch (final TopologyBuilderException expected) { /* ok */ }
+ fail("Should throw TopologyException with sink name conflict");
+ } catch (final TopologyException expected) { /* ok */ }
}
- @Test(expected = TopologyBuilderException.class)
+ @Test(expected = TopologyException.class)
public void testAddSinkWithWrongParent() {
builder.addSink("sink", "topic-2", null, null, null, "source");
}
- @Test(expected = TopologyBuilderException.class)
+ @Test(expected = TopologyException.class)
public void testAddSinkWithSelfParent() {
builder.addSink("sink", "topic-2", null, null, null, "sink");
}
@@ -247,8 +248,8 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source-1", null, null, null, "foo");
try {
builder.addSource(null, "source-2", null, null, null, Pattern.compile("f.*"));
- fail("Should throw TopologyBuilderException with topic name/pattern conflict");
- } catch (final TopologyBuilderException expected) { /* ok */ }
+ fail("Should throw TopologyException with topic name/pattern conflict");
+ } catch (final TopologyException expected) { /* ok */ }
}
@Test
@@ -256,11 +257,11 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source-1", null, null, null, Pattern.compile("f.*"));
try {
builder.addSource(null, "source-2", null, null, null, "foo");
- fail("Should throw TopologyBuilderException with topic name/pattern conflict");
- } catch (final TopologyBuilderException expected) { /* ok */ }
+ fail("Should throw TopologyException with topic name/pattern conflict");
+ } catch (final TopologyException expected) { /* ok */ }
}
- @Test(expected = TopologyBuilderException.class)
+ @Test(expected = TopologyException.class)
public void testAddStateStoreWithNonExistingProcessor() {
builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
}
@@ -270,8 +271,8 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source-1", null, null, null, "topic-1");
try {
builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
- fail("Should throw TopologyBuilderException with store cannot be added to source");
- } catch (final TopologyBuilderException expected) { /* ok */ }
+ fail("Should throw TopologyException with store cannot be added to source");
+ } catch (final TopologyException expected) { /* ok */ }
}
@Test
@@ -279,8 +280,8 @@ public class InternalTopologyBuilderTest {
builder.addSink("sink-1", "topic-1", null, null, null);
try {
builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
- fail("Should throw TopologyBuilderException with store cannot be added to sink");
- } catch (final TopologyBuilderException expected) { /* ok */ }
+ fail("Should throw TopologyException with store cannot be added to sink");
+ } catch (final TopologyException expected) { /* ok */ }
}
@Test
@@ -288,8 +289,8 @@ public class InternalTopologyBuilderTest {
builder.addStateStore(new MockStateStoreSupplier("store", false));
try {
builder.addStateStore(new MockStateStoreSupplier("store", false));
- fail("Should throw TopologyBuilderException with store name conflict");
- } catch (final TopologyBuilderException expected) { /* ok */ }
+ fail("Should throw TopologyException with store name conflict");
+ } catch (final TopologyException expected) { /* ok */ }
}
@Test
@@ -326,12 +327,12 @@ public class InternalTopologyBuilderTest {
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
- final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
- final Map<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
- expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
- expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
- expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+ final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
+ expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+ expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
+ expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups);
@@ -363,13 +364,13 @@ public class InternalTopologyBuilderTest {
builder.addStateStore(supplier);
builder.connectProcessorAndStateStores("processor-5", "store-3");
- final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
- final Map<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
+ final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1");
final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
- expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(
+ expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(
Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
Collections.<String, InternalTopicConfig>emptyMap(),
Collections.singletonMap(
@@ -378,7 +379,7 @@ public class InternalTopologyBuilderTest {
store1,
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
Collections.<String, String>emptyMap()))));
- expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(
+ expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(
Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
Collections.<String, InternalTopicConfig>emptyMap(),
Collections.singletonMap(
@@ -387,7 +388,7 @@ public class InternalTopologyBuilderTest {
store2,
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
Collections.<String, String>emptyMap()))));
- expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(
+ expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(
Collections.<String>emptySet(), mkSet("topic-5"),
Collections.<String, InternalTopicConfig>emptyMap(),
Collections.singletonMap(store3,
@@ -519,8 +520,8 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), false), "processor");
- final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
- final TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+ final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
final Properties properties = topicConfig.toProperties(0);
final List<String> policies = Arrays.asList(properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP).split(","));
@@ -539,8 +540,8 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
builder.addStateStore(new MockStateStoreSupplier("name", true), "processor");
- final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
- final TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+ final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-name-changelog");
final Properties properties = topicConfig.toProperties(0);
assertEquals("appId-name-changelog", topicConfig.name());
@@ -554,7 +555,7 @@ public class InternalTopologyBuilderTest {
builder.setApplicationId("appId");
builder.addInternalTopic("foo");
builder.addSource(null, "source", null, null, null, "foo");
- final TopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
+ final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
final Properties properties = topicConfig.toProperties(0);
assertEquals("appId-foo", topicConfig.name());
@@ -562,8 +563,9 @@ public class InternalTopologyBuilderTest {
assertEquals(1, properties.size());
}
- @Test(expected = TopologyBuilderException.class)
- public void shouldThroughOnUnassignedStateStoreAccess() {
+ @SuppressWarnings("deprecation")
+ @Test
+ public void shouldThrowOnUnassignedStateStoreAccess() {
final String sourceNodeName = "source";
final String goodNodeName = "goodGuy";
final String badNodeName = "badGuy";
@@ -573,24 +575,22 @@ public class InternalTopologyBuilderTest {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
final StreamsConfig streamsConfig = new StreamsConfig(config);
+ builder.addSource(null, sourceNodeName, null, null, null, "topic");
+ builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+ builder.addStateStore(
+ Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
+ goodNodeName);
+ builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
+
try {
- builder.addSource(null, sourceNodeName, null, null, null, "topic");
- builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
- builder.addStateStore(
- Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
- goodNodeName);
- builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
-
- final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder);
- driver.process("topic", null, null);
- } catch (final StreamsException e) {
- final Throwable cause = e.getCause();
- if (cause != null
- && cause instanceof TopologyBuilderException
- && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
- throw (TopologyBuilderException) cause;
- } else {
- throw new RuntimeException("Did expect different exception. Did catch:", e);
+ new ProcessorTopologyTestDriver(streamsConfig, builder);
+ fail("Should have throw StreamsException");
+ } catch (final StreamsException expected) {
+ final Throwable cause = expected.getCause();
+ if (cause == null
+ || !(cause instanceof TopologyBuilderException)
+ || !cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
+ throw new RuntimeException("Did expect different exception. Did catch:", expected);
}
}
}
@@ -639,7 +639,7 @@ public class InternalTopologyBuilderTest {
builder.updateSubscriptions(subscriptionUpdates, null);
builder.setApplicationId("test-id");
- final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo"));
assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A"));
assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B"));
@@ -693,7 +693,7 @@ public class InternalTopologyBuilderTest {
assertFalse(topics.contains("topic-A"));
}
- @Test(expected = TopologyBuilderException.class)
+ @Test(expected = TopologyException.class)
public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
final String sameNameForSourceAndProcessor = "sameName";
builder.addGlobalStore(
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index f173a65..33e43e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -35,7 +35,6 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
@@ -526,18 +525,18 @@ public class StreamPartitionAssignorTest {
assertEquals(new HashSet<>(tasks), allTasks);
// check tasks for state topics
- Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();
+ Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();
assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1", tasks, topicGroups));
assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2", tasks, topicGroups));
assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3", tasks, topicGroups));
}
- private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, TopologyBuilder.TopicsInfo> topicGroups) {
+ private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups) {
final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
Set<TaskId> ids = new HashSet<>();
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
if (stateChangelogTopics.contains(changelogTopic)) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index ce25e67..55f8728 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -178,7 +178,7 @@ public class ProcessorTopologyTestDriver {
};
// Identify internal topics for forwarding in process ...
- for (final TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
+ for (final InternalTopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
}
[3/3] kafka git commit: KAFKA-5670: (KIP-120) Add Topology and
deprecate TopologyBuilder
Posted by gu...@apache.org.
KAFKA-5670: (KIP-120) Add Topology and deprecate TopologyBuilder
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3590 from mjsax/kafka-3856-replace-topology-builder-by-topology
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1844bf2b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1844bf2b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1844bf2b
Branch: refs/heads/trunk
Commit: 1844bf2b2f4cdf5a8209d7ceccb6701fc7dcf768
Parents: c50c941
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Jul 28 16:46:34 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jul 28 16:46:34 2017 -0700
----------------------------------------------------------------------
docs/streams/developer-guide.html | 63 +-
.../wordcount/WordCountProcessorDemo.java | 6 +-
.../org/apache/kafka/streams/KafkaStreams.java | 71 +-
.../java/org/apache/kafka/streams/Topology.java | 640 ++++++++++++++++++
.../kafka/streams/TopologyDescription.java | 7 +-
.../errors/TopologyBuilderException.java | 5 +-
.../kafka/streams/errors/TopologyException.java | 40 ++
.../apache/kafka/streams/kstream/KStream.java | 4 +-
.../kstream/internals/KStreamTransform.java | 1 +
.../internals/KStreamTransformValues.java | 2 +
.../streams/kstream/internals/KTableImpl.java | 10 +-
.../streams/processor/AbstractProcessor.java | 1 +
.../streams/processor/TopologyBuilder.java | 221 +++---
.../internals/GlobalProcessorContextImpl.java | 5 +-
.../internals/InternalTopologyBuilder.java | 242 ++++---
.../internals/ProcessorContextImpl.java | 1 +
.../internals/SourceNodeRecordDeserializer.java | 1 +
.../internals/StreamPartitionAssignor.java | 15 +-
.../org/apache/kafka/streams/TopologyTest.java | 671 +++++++++++++++++++
...eamsFineGrainedAutoResetIntegrationTest.java | 19 +-
.../streams/processor/TopologyBuilderTest.java | 2 +-
.../kafka/streams/processor/TopologyTest.java | 408 -----------
.../internals/InternalTopologyBuilderTest.java | 140 ++--
.../internals/StreamPartitionAssignorTest.java | 7 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 2 +-
25 files changed, 1865 insertions(+), 719 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index 542003d..76c2dc7 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -123,14 +123,14 @@
<h4><a id="streams_processor_topology" href="#streams_processor_topology">Processor Topology</a></h4>
<p>
- With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code> to build a processor topology
+ With the customized processors defined in the Processor API, developers can use <code>Topology</code> to build a processor topology
by connecting these processors together:
</p>
<pre class="brush: java;">
- TopologyBuilder builder = new TopologyBuilder();
+ Topology topology = new Topology();
- builder.addSource("SOURCE", "src-topic")
+ topology.addSource("SOURCE", "src-topic")
// add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor
.addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
@@ -180,15 +180,15 @@
</pre>
<p>
- To take advantage of these state stores, developers can use the <code>TopologyBuilder.addStateStore</code> method when building the
+ To take advantage of these state stores, developers can use the <code>Topology.addStateStore</code> method when building the
processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created
- state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
+ state store with the existing processor nodes through <code>Topology.connectProcessorAndStateStores</code>.
</p>
<pre class="brush: java;">
- TopologyBuilder builder = new TopologyBuilder();
+ Topology topology = new Topology();
- builder.addSource("SOURCE", "src-topic")
+ topology.addSource("SOURCE", "src-topic")
.addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
// add the created state store "COUNTS" associated with processor "PROCESS1"
@@ -204,12 +204,32 @@
.addSink("SINK3", "sink-topic3", "PROCESS3");
</pre>
+ <h4><a id="streams_processor_describe" href="#streams_processor_describe">Describe a <code>Topology</code></a></h4>
+
+ <p>
+ After a <code>Topology</code> is specified it is possible to retrieve a description of the corresponding DAG via <code>#describe()</code> that returns a <code>TopologyDescription</code>.
+ A <code>TopologyDescription</code> contains all added source, processor, and sink nodes as well as all attached stores.
+ For source and sink nodes one can access the specified input/output topic name/pattern.
+ For processor nodes the attached stores are added to the description.
+ Additionally, all nodes have a list to all their connected successor and predecessor nodes.
+ Thus, <code>TopologyDescritpion</code> allows to retrieve the DAG structure of the specified topology.
+ <br />
+ Note that global stores are listed explicitly as they are accessible by all nodes without the need to explicitly connect them.
+ Furthermore, nodes are grouped by <code>Subtopology</code>.
+ Subtopologies are groups of nodes that are directly connected to each other (i.e., either by a direct connection—but not a topic—or by sharing a store).
+ For execution, each <code>Subtopology</code> is executed by <a href="/{{version}}/documentation/streams/architecture#streams_architecture_tasks">one or multiple tasks</a>.
+ Thus, each <code>Subtopology</code> describes an independent unit of works that can be executed by different threads in parallel.
+ <br />
+ Describing a <code>Topology</code> is helpful to reason about tasks and thus maximum parallelism.
+ It is also helpful to get insight into a <code>Topology</code> if it is not specified manually but via Kafka Streams DSL that is described in the next section.
+ </p>
+
In the next section we present another way to build the processor topology: the Kafka Streams DSL.
<br>
<h3><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h3>
- To build a processor topology using the Streams DSL, developers can apply the <code>KStreamBuilder</code> class, which is extended from the <code>TopologyBuilder</code>.
+ To build a <code>Topology</code> using the Streams DSL, developers can apply the <code>StreamsBuilder</code> class.
A simple example is included with the source code for Kafka in the <code>streams/examples</code> package. The rest of this section will walk
through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source
codes for details.
@@ -755,19 +775,19 @@
<pre class="brush: java;">
StreamsConfig config = ...;
- TopologyBuilder builder = ...;
+ Topology topology = ...;
ProcessorSupplier processorSuppler = ...;
// Create CustomStoreSupplier for store name the-custom-store
MyCustomStoreSuppler customStoreSupplier = new MyCustomStoreSupplier("the-custom-store");
// Add the source topic
- builder.addSource("input", "inputTopic");
+ topology.addSource("input", "inputTopic");
// Add a custom processor that reads from the source topic
- builder.addProcessor("the-processor", processorSupplier, "input");
+ topology.addProcessor("the-processor", processorSupplier, "input");
// Connect your custom state store to the custom processor above
- builder.addStateStore(customStoreSupplier, "the-processor");
+ topology.addStateStore(customStoreSupplier, "the-processor");
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
// Get access to the custom store
@@ -1053,33 +1073,36 @@
</p>
<p>
- First, you must create an instance of <code>KafkaStreams</code>. The first argument of the <code>KafkaStreams</code> constructor takes a topology
- builder (either <code>KStreamBuilder</code> for the Kafka Streams DSL, or <code>TopologyBuilder</code> for the Processor API)
- that is used to define a topology; The second argument is an instance of <code>StreamsConfig</code> mentioned above.
+ First, you must create an instance of <code>KafkaStreams</code>.
+ The first argument of the <code>KafkaStreams</code> constructor takes a <code>Topology</code>
+ that is a logical topology description (you can create a <code>Topology</code> either directly or use
+ <code>StreamsBuilder</code> to create one).
+ The second argument is an instance of <code>StreamsConfig</code> mentioned above.
</p>
<pre class="brush: java;">
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
- import org.apache.kafka.streams.processor.TopologyBuilder;
+ import org.apache.kafka.streams.Topology;
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
- KStreamBuilder builder = ...; // when using the Kafka Streams DSL
+ Topology topology = ...; // when using the Processor API
//
// OR
//
- TopologyBuilder builder = ...; // when using the Processor API
+ KStreamBuilder builder = ...; // when using the Kafka Streams DSL
+ Topology topology = builder.topology();
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(topology, config);
</pre>
<p>
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 0ff42a7..34bb8bb 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -19,15 +19,15 @@ package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@@ -119,7 +119,7 @@ public class WordCountProcessorDemo {
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- TopologyBuilder builder = new TopologyBuilder();
+ Topology builder = new Topology();
builder.addSource("Source", "streams-wordcount-input");
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 0d88efb..d7c608a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
@@ -121,7 +122,7 @@ import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG
* }</pre>
*
* @see KStreamBuilder
- * @see TopologyBuilder
+ * @see Topology
*/
@InterfaceStability.Evolving
public class KafkaStreams {
@@ -402,36 +403,72 @@ public class KafkaStreams {
}
/**
+ * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+ */
+ @Deprecated
+ public KafkaStreams(final TopologyBuilder builder,
+ final Properties props) {
+ this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+ }
+
+ /**
+ * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig)} instead
+ */
+ @Deprecated
+ public KafkaStreams(final TopologyBuilder builder,
+ final StreamsConfig config) {
+ this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+ }
+
+ /**
+ * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig, KafkaClientSupplier)} instead
+ */
+ @Deprecated
+ public KafkaStreams(final TopologyBuilder builder,
+ final StreamsConfig config,
+ final KafkaClientSupplier clientSupplier) {
+ this(builder.internalTopologyBuilder, config, clientSupplier);
+ }
+
+ /**
* Create a {@code KafkaStreams} instance.
*
- * @param builder the processor topology builder specifying the computational logic
+ * @param topology the topology specifying the computational logic
* @param props properties for {@link StreamsConfig}
*/
- public KafkaStreams(final TopologyBuilder builder, final Properties props) {
- this(builder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+ public KafkaStreams(final Topology topology,
+ final Properties props) {
+ this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
}
/**
* Create a {@code KafkaStreams} instance.
*
- * @param builder the processor topology builder specifying the computational logic
+ * @param topology the topology specifying the computational logic
* @param config the Kafka Streams configuration
*/
- public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config) {
- this(builder, config, new DefaultKafkaClientSupplier());
+ public KafkaStreams(final Topology topology,
+ final StreamsConfig config) {
+ this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
}
/**
* Create a {@code KafkaStreams} instance.
*
- * @param builder the processor topology builder specifying the computational logic
+ * @param topology the topology specifying the computational logic
* @param config the Kafka Streams configuration
* @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients
* for the new {@code KafkaStreams} instance
*/
- public KafkaStreams(final TopologyBuilder builder,
+ public KafkaStreams(final Topology topology,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier) {
+ this(topology.internalTopologyBuilder, config, clientSupplier);
+ }
+
+ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
+ final StreamsConfig config,
+ final KafkaClientSupplier clientSupplier) {
// create the metrics
final Time time = Time.SYSTEM;
@@ -442,7 +479,7 @@ public class KafkaStreams {
// The application ID is a required config and hence should always have value
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
- builder.setApplicationId(applicationId);
+ internalTopologyBuilder.setApplicationId(applicationId);
String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
@@ -466,16 +503,16 @@ public class KafkaStreams {
GlobalStreamThread.State globalThreadState = null;
final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
- streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
+ streamsMetadataState = new StreamsMetadataState(internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
- final ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology();
+ final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", logPrefix);
}
final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
- (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
+ (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
if (globalTaskTopology != null) {
@@ -491,7 +528,7 @@ public class KafkaStreams {
}
for (int i = 0; i < threads.length; i++) {
- threads[i] = new StreamThread(builder.internalTopologyBuilder,
+ threads[i] = new StreamThread(internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -509,11 +546,11 @@ public class KafkaStreams {
if (globalTaskTopology != null) {
globalStreamThread.setStateListener(streamStateListener);
}
- for (int i = 0; i < threads.length; i++) {
- threads[i].setStateListener(streamStateListener);
+ for (StreamThread thread : threads) {
+ thread.setStateListener(streamStateListener);
}
- final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores());
+ final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
final String cleanupThreadName = clientId + "-CleanupThread";
stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
new file mode 100644
index 0000000..ca0ac75
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.SinkNode;
+import org.apache.kafka.streams.processor.internals.SourceNode;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.regex.Pattern;
+
+/**
+ * A logical representation of a {@link ProcessorTopology}.
+ * A topology is an acyclic graph of sources, processors, and sinks.
+ * A {@link SourceNode source} is a node in the graph that consumes one or more Kafka topics and forwards them to its
+ * successor nodes.
+ * A {@link Processor processor} is a node in the graph that receives input records from upstream nodes, processes the
+ * records, and optionally forwarding new records to one or all of its downstream nodes.
+ * Finally, a {@link SinkNode sink} is a node in the graph that receives records from upstream nodes and writes them to
+ * a Kafka topic.
+ * A {@code Topology} allows you to construct an acyclic graph of these nodes, and then passed into a new
+ * {@link KafkaStreams} instance that will then {@link KafkaStreams#start() begin consuming, processing, and producing
+ * records}.
+ */
+public class Topology {
+
+ final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
+
+ /**
+ * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}
+ */
+ public enum AutoOffsetReset {
+ EARLIEST, LATEST
+ }
+
+ /**
+ * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+ * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+ * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link StreamsConfig stream configuration}.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ *
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param topics the name of one or more Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by another source
+ */
+ public synchronized Topology addSource(final String name,
+ final String... topics) {
+ internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes from topics matching the given pattern
+ * and forward the records to child processor and/or sink nodes.
+ * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+ * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link StreamsConfig stream configuration}.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ *
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by another source
+ */
+ public synchronized Topology addSource(final String name,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+ * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+ * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link StreamsConfig stream configuration}.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ *
+ * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latest
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param topics the name of one or more Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by another source
+ */
+ public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+ final String name,
+ final String... topics) {
+ internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topics);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes from topics matching the given pattern
+ * and forward the records to child processor and/or sink nodes.
+ * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+ * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link StreamsConfig stream configuration}.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ *
+ * @param offsetReset the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest.
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by another source
+ */
+ public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+ final String name,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topicPattern);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+ * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+ * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link StreamsConfig stream configuration}.
+ *
+ * @param timestampExtractor the stateless timestamp extractor used for this source,
+ * if not specified the default extractor defined in the configs will be used
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param topics the name of one or more Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by another source
+ */
+ public synchronized Topology addSource(final TimestampExtractor timestampExtractor,
+ final String name,
+ final String... topics) {
+ internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes from topics matching the given pattern
+ * and forward the records to child processor and/or sink nodes.
+ * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+ * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link StreamsConfig stream configuration}.
+ *
+ * @param timestampExtractor the stateless timestamp extractor used for this source,
+ * if not specified the default extractor defined in the configs will be used
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by another source
+ */
+ public synchronized Topology addSource(final TimestampExtractor timestampExtractor,
+ final String name,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+ * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+ * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link StreamsConfig stream configuration}.
+ *
+ * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found;
+ * acceptable values earliest or latest
+ * @param timestampExtractor the stateless timestamp extractor used for this source,
+ * if not specified the default extractor defined in the configs will be used
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param topics the name of one or more Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by another source
+ */
+ public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final String name,
+ final String... topics) {
+ internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topics);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes from topics matching the given pattern and forward the records to child processor
+ * and/or sink nodes.
+ * The source will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+ * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+ * {@link StreamsConfig stream configuration}.
+ *
+ * @param offsetReset the auto offset reset policy value for this source if no committed offsets found;
+ * acceptable values earliest or latest.
+ * @param timestampExtractor the stateless timestamp extractor used for this source,
+ * if not specified the default extractor defined in the configs will be used
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by another source
+ */
+ public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+ final TimestampExtractor timestampExtractor,
+ final String name,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
+ * The source will use the specified key and value deserializers.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ *
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+ * @param keyDeserializer key deserializer used to read this source, if not specified the default
+ * key deserializer defined in the configs will be used
+ * @param valueDeserializer value deserializer used to read this source,
+ * if not specified the default value deserializer defined in the configs will be used
+ * @param topics the name of one or more Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by another source
+ */
+ public synchronized Topology addSource(final String name,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String... topics) {
+ internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topics);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+ * and/or sink nodes.
+ * The source will use the specified key and value deserializers.
+ * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+ * topics that share the same key-value data format.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ *
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+ * @param keyDeserializer key deserializer used to read this source, if not specified the default
+ * key deserializer defined in the configs will be used
+ * @param valueDeserializer value deserializer used to read this source,
+ * if not specified the default value deserializer defined in the configs will be used
+ * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by name
+ */
+ public synchronized Topology addSource(final String name,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valueDeserializer, topicPattern);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+ * and/or sink nodes.
+ * The source will use the specified key and value deserializers.
+ * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+ * topics that share the same key-value data format.
+ *
+ * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found;
+ * acceptable values are earliest or latest
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+ * @param keyDeserializer key deserializer used to read this source, if not specified the default
+ * key deserializer defined in the configs will be used
+ * @param valueDeserializer value deserializer used to read this source,
+ * if not specified the default value deserializer defined in the configs will be used
+ * @param topics the name of one or more Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by name
+ */
+ public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+ final String name,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String... topics) {
+ internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topics);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+ * and/or sink nodes.
+ * The source will use the specified key and value deserializers.
+ * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+ * topics that share the same key-value data format.
+ *
+ * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found;
+ * acceptable values are earliest or latest
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+ * @param keyDeserializer key deserializer used to read this source, if not specified the default
+ * key deserializer defined in the configs will be used
+ * @param valueDeserializer value deserializer used to read this source,
+ * if not specified the default value deserializer defined in the configs will be used
+ * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by name
+ */
+ public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+ final String name,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valueDeserializer, topicPattern);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
+ * The source will use the specified key and value deserializers.
+ *
+ * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found;
+ * acceptable values are earliest or latest.
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param timestampExtractor the stateless timestamp extractor used for this source,
+ * if not specified the default extractor defined in the configs will be used
+ * @param keyDeserializer key deserializer used to read this source, if not specified the default
+ * key deserializer defined in the configs will be used
+ * @param valueDeserializer value deserializer used to read this source,
+ * if not specified the default value deserializer defined in the configs will be used
+ * @param topics the name of one or more Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by another source
+ */
+
+ public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+ final String name,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String... topics) {
+ internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topics);
+ return this;
+ }
+
+ /**
+ * Add a new source that consumes from topics matching the given pattern and forwards the records to child processor
+ * and/or sink nodes.
+ * The source will use the specified key and value deserializers.
+ * The provided de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+ * topics that share the same key-value data format.
+ *
+ * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found;
+ * acceptable values are earliest or latest
+ * @param name the unique name of the source used to reference this node when
+ * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+ * @param timestampExtractor the stateless timestamp extractor used for this source,
+ * if not specified the default extractor defined in the configs will be used
+ * @param keyDeserializer key deserializer used to read this source, if not specified the default
+ * key deserializer defined in the configs will be used
+ * @param valueDeserializer value deserializer used to read this source,
+ * if not specified the default value deserializer defined in the configs will be used
+ * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+ * @return itself
+ * @throws TopologyException if processor is already added or if topics have already been registered by name
+ */
+ public synchronized Topology addSource(final AutoOffsetReset offsetReset,
+ final String name,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final Pattern topicPattern) {
+ internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
+ return this;
+ }
+
+ /**
+ * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+ * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+ * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+ * {@link StreamsConfig stream configuration}.
+ *
+ * @param name the unique name of the sink
+ * @param topic the name of the Kafka topic to which this sink should write its records
+ * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+ * and write to its topic
+ * @return itself
+ * @throws TopologyException itself
+ * @see #addSink(String, String, StreamPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+ */
+ public synchronized Topology addSink(final String name,
+ final String topic,
+ final String... parentNames) {
+ internalTopologyBuilder.addSink(name, topic, null, null, null, parentNames);
+ return this;
+ }
+
+ /**
+ * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic,
+ * using the supplied partitioner.
+ * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+ * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+ * {@link StreamsConfig stream configuration}.
+ * <p>
+ * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
+ * the named Kafka topic's partitions.
+ * Such control is often useful with topologies that use {@link #addStateStore(StateStoreSupplier, String...) state
+ * stores} in its processors.
+ * In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute
+ * records among partitions using Kafka's default partitioning logic.
+ *
+ * @param name the unique name of the sink
+ * @param topic the name of the Kafka topic to which this sink should write its records
+ * @param partitioner the function that should be used to determine the partition for each record processed by the sink
+ * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+ * and write to its topic
+ * @return itself
+ * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+ * @see #addSink(String, String, String...)
+ * @see #addSink(String, String, Serializer, Serializer, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+ */
+ public synchronized Topology addSink(final String name,
+ final String topic,
+ final StreamPartitioner partitioner,
+ final String... parentNames) {
+ internalTopologyBuilder.addSink(name, topic, null, null, partitioner, parentNames);
+ return this;
+ }
+
+ /**
+ * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+ * The sink will use the specified key and value serializers.
+ *
+ * @param name the unique name of the sink
+ * @param topic the name of the Kafka topic to which this sink should write its records
+ * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
+ * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
+ * {@link StreamsConfig stream configuration}
+ * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
+ * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+ * {@link StreamsConfig stream configuration}
+ * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+ * and write to its topic
+ * @return itself
+ * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+ * @see #addSink(String, String, String...)
+ * @see #addSink(String, String, StreamPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+ */
+ public synchronized Topology addSink(final String name,
+ final String topic,
+ final Serializer keySerializer,
+ final Serializer valueSerializer,
+ final String... parentNames) {
+ internalTopologyBuilder.addSink(name, topic, keySerializer, valueSerializer, null, parentNames);
+ return this;
+ }
+
+ /**
+ * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
+ * The sink will use the specified key and value serializers, and the supplied partitioner.
+ *
+ * @param name the unique name of the sink
+ * @param topic the name of the Kafka topic to which this sink should write its records
+ * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
+ * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
+ * {@link StreamsConfig stream configuration}
+ * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
+ * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+ * {@link StreamsConfig stream configuration}
+ * @param partitioner the function that should be used to determine the partition for each record processed by the sink
+ * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
+ * and write to its topic
+ * @return itself
+ * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+ * @see #addSink(String, String, String...)
+ * @see #addSink(String, String, StreamPartitioner, String...)
+ * @see #addSink(String, String, Serializer, Serializer, String...)
+ */
+ public synchronized <K, V> Topology addSink(final String name,
+ final String topic,
+ final Serializer<K> keySerializer,
+ final Serializer<V> valueSerializer,
+ final StreamPartitioner<? super K, ? super V> partitioner,
+ final String... parentNames) {
+ internalTopologyBuilder.addSink(name, topic, keySerializer, valueSerializer, partitioner, parentNames);
+ return this;
+ }
+
+ /**
+ * Add a new processor node that receives and processes records output by one or more parent source or processor
+ * node.
+ * Any new record output by this processor will be forwarded to its child processor or sink nodes.
+ *
+ * @param name the unique name of the processor node
+ * @param supplier the supplier used to obtain this node's {@link Processor} instance
+ * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
+ * and process
+ * @return itself
+ * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
+ */
+ public synchronized Topology addProcessor(final String name,
+ final ProcessorSupplier supplier,
+ final String... parentNames) {
+ internalTopologyBuilder.addProcessor(name, supplier, parentNames);
+ return this;
+ }
+
+ /**
+ * Adds a state store.
+ *
+ * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+ * @return itself
+ * @throws TopologyException if state store supplier is already added
+ */
+ public synchronized Topology addStateStore(final StateStoreSupplier supplier,
+ final String... processorNames) {
+ internalTopologyBuilder.addStateStore(supplier, processorNames);
+ return this;
+ }
+
+ /**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+ * <p>
+ * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+ * of the input topic.
+ * <p>
+ * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+ * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+ *
+ * @param storeSupplier user defined state store supplier
+ * @param sourceName name of the {@link SourceNode} that will be automatically added
+ * @param keyDeserializer the {@link Deserializer} to deserialize keys with
+ * @param valueDeserializer the {@link Deserializer} to deserialize values with
+ * @param topic the topic to source the data from
+ * @param processorName the name of the {@link ProcessorSupplier}
+ * @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
+ * @return itself
+ * @throws TopologyException if the processor of state is already registered
+ */
+ public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final String sourceName,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier) {
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer,
+ valueDeserializer, topic, processorName, stateUpdateSupplier);
+ return this;
+ }
+
+ /**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
+ * <p>
+ * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
+ * of the input topic.
+ * <p>
+ * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+ *
+ * @param storeSupplier user defined state store supplier
+ * @param sourceName name of the {@link SourceNode} that will be automatically added
+ * @param timestampExtractor the stateless timestamp extractor used for this source,
+ * if not specified the default extractor defined in the configs will be used
+ * @param keyDeserializer the {@link Deserializer} to deserialize keys with
+ * @param valueDeserializer the {@link Deserializer} to deserialize values with
+ * @param topic the topic to source the data from
+ * @param processorName the name of the {@link ProcessorSupplier}
+ * @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
+ * @return itself
+ * @throws TopologyException if the processor of state is already registered
+ */
+ public synchronized Topology addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final String sourceName,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier) {
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
+ valueDeserializer, topic, processorName, stateUpdateSupplier);
+ return this;
+ }
+
+ /**
+ * Connects the processor and the state stores.
+ *
+ * @param processorName the name of the processor
+ * @param stateStoreNames the names of state stores that the processor uses
+ * @return itself
+ * @throws TopologyException if the processor or a state store is unknown
+ */
+ public synchronized Topology connectProcessorAndStateStores(final String processorName,
+ final String... stateStoreNames) {
+ internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
+ return this;
+ }
+
+ public synchronized TopologyDescription describe() {
+ return internalTopologyBuilder.describe();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index dd481ff..725b7b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -35,8 +35,9 @@ public interface TopologyDescription {
/**
* A connected sub-graph of a {@link Topology}.
* <p>
- * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String, ProcessorSupplier, String...)
- * directly} or indirectly via {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
+ * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String,
+ * org.apache.kafka.streams.processor.ProcessorSupplier, String...) directly} or indirectly via
+ * {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
* (i.e., if multiple processors share the same state).
*/
interface Subtopology {
@@ -54,7 +55,7 @@ public interface TopologyDescription {
}
/**
- * Represents a {@link Topology#addGlobalStore(StateStoreSupplier, String,
+ * Represents a {@link Topology#addGlobalStore(org.apache.kafka.streams.processor.StateStoreSupplier, String,
* org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
* String, ProcessorSupplier)} global store}.
* Adding a global store results in adding a source node and one stateful processor node.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
index b9c0c3a..385d401 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyBuilderException.java
@@ -18,9 +18,12 @@ package org.apache.kafka.streams.errors;
/**
- * Indicates a pre-run time error incurred while parsing the {@link org.apache.kafka.streams.processor.TopologyBuilder
+ * Indicates a pre-run time error occurred while parsing the {@link org.apache.kafka.streams.processor.TopologyBuilder
* builder} to construct the {@link org.apache.kafka.streams.processor.internals.ProcessorTopology processor topology}.
+ *
+ * @deprecated use {@link org.apache.kafka.streams.Topology} instead of {@link org.apache.kafka.streams.processor.TopologyBuilder}
*/
+@Deprecated
public class TopologyBuilderException extends StreamsException {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
new file mode 100644
index 0000000..1eaef06
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+
+/**
+ * Indicates a pre run time error occurred while parsing the {@link org.apache.kafka.streams.Topology logical topology}
+ * to construct the {@link org.apache.kafka.streams.processor.internals.ProcessorTopology physical processor topology}.
+ */
+public class TopologyException extends StreamsException {
+
+ private static final long serialVersionUID = 1L;
+
+ public TopologyException(final String message) {
+ super("Invalid topology" + (message == null ? "" : ": " + message));
+ }
+
+ public TopologyException(final String message,
+ final Throwable throwable) {
+ super("Invalid topology" + (message == null ? "" : ": " + message), throwable);
+ }
+
+ public TopologyException(final Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 191931b..535e1e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -22,13 +22,13 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.TopologyBuilder;
/**
* {@code KStream} is an abstraction of a <i>record stream</i> of {@link KeyValue} pairs, i.e., each record is an
@@ -42,7 +42,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
* <p>
* A {@code KStream} can be transformed record by record, joined with another {@code KStream}, {@link KTable},
* {@link GlobalKTable}, or can be aggregated into a {@link KTable}.
- * Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. {@link TopologyBuilder}) via
+ * Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. {@link Topology}) via
* {@link #process(ProcessorSupplier, String...) process(...)},
* {@link #transform(TransformerSupplier, String...) transform(...)}, and
* {@link #transformValues(ValueTransformerSupplier, String...) transformValues(...)}.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 93cf410..0afadbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -59,6 +59,7 @@ public class KStreamTransform<K, V, K1, V1> implements ProcessorSupplier<K, V> {
context().forward(pair.key, pair.value);
}
+ @SuppressWarnings("deprecation")
@Override
public void punctuate(long timestamp) {
KeyValue<? extends K2, ? extends V2> pair = transformer.punctuate(timestamp);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index a6e9aaf..ab1c302 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -105,6 +105,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
return context.schedule(interval, type, callback);
}
+ @SuppressWarnings("deprecation")
@Override
public void schedule(final long interval) {
context.schedule(interval);
@@ -168,6 +169,7 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
context.forward(key, valueTransformer.transform(value));
}
+ @SuppressWarnings("deprecation")
@Override
public void punctuate(long timestamp) {
if (valueTransformer.punctuate(timestamp) != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 679efe5..048670e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -238,22 +238,25 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return doMapValues(mapper, valueSerde, storeSupplier);
}
+ @SuppressWarnings("deprecation")
@Override
public void print() {
print(null, null, this.name);
}
+ @SuppressWarnings("deprecation")
@Override
public void print(String label) {
print(null, null, label);
}
+ @SuppressWarnings("deprecation")
@Override
public void print(Serde<K> keySerde, Serde<V> valSerde) {
print(keySerde, valSerde, this.name);
}
-
+ @SuppressWarnings("deprecation")
@Override
public void print(Serde<K> keySerde, final Serde<V> valSerde, String label) {
Objects.requireNonNull(label, "label can't be null");
@@ -261,16 +264,19 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label), keySerde, valSerde), this.name);
}
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(String filePath) {
writeAsText(filePath, this.name, null, null);
}
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(String filePath, String label) {
writeAsText(filePath, label, null, null);
}
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
writeAsText(filePath, this.name, keySerde, valSerde);
@@ -279,6 +285,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
/**
* @throws TopologyBuilderException if file is not found
*/
+ @SuppressWarnings("deprecation")
@Override
public void writeAsText(String filePath, String label, Serde<K> keySerde, Serde<V> valSerde) {
Objects.requireNonNull(filePath, "filePath can't be null");
@@ -296,6 +303,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
}
+ @SuppressWarnings("deprecation")
@Override
public void foreach(final ForeachAction<? super K, ? super V> action) {
Objects.requireNonNull(action, "action can't be null");
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
index 49b3c18..1cfe78a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java
@@ -44,6 +44,7 @@ public abstract class AbstractProcessor<K, V> implements Processor<K, V> {
*
* @param timestamp the wallclock time when this method is being called
*/
+ @SuppressWarnings("deprecation")
@Override
public void punctuate(long timestamp) {
// do nothing
[2/3] kafka git commit: KAFKA-5670: (KIP-120) Add Topology and
deprecate TopologyBuilder
Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ce6ba7b..e6c0d6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -16,11 +16,12 @@
*/
package org.apache.kafka.streams.processor;
-import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
@@ -33,6 +34,7 @@ import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.Subs
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -46,24 +48,29 @@ import java.util.regex.Pattern;
* is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. This builder allows you
* to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
* instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}.
+ *
+ * @deprecated use {@link Topology} instead
*/
-@InterfaceStability.Evolving
+@Deprecated
public class TopologyBuilder {
/**
* NOTE this member would not needed by developers working with the processor APIs, but only used
* for internal functionalities.
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
+ private Topology.AutoOffsetReset translateAutoOffsetReset(final TopologyBuilder.AutoOffsetReset resetPolicy) {
+ if (resetPolicy == null) {
+ return null;
+ }
+ return resetPolicy == TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
+ }
+
/**
* NOTE this class would not needed by developers working with the processor APIs, but only used
* for internal functionalities.
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public static class TopicsInfo {
public Set<String> sinkTopics;
public Set<String> sourceTopics;
@@ -108,7 +115,7 @@ public class TopologyBuilder {
}
/**
- * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}
+ * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}.
*/
public enum AutoOffsetReset {
EARLIEST, LATEST
@@ -119,8 +126,7 @@ public class TopologyBuilder {
*/
public TopologyBuilder() {}
- /** @deprecated This class is not part of public API and should never be used by a developer. */
- @Deprecated
+ /** This class is not part of public API and should never be used by a developer. */
public synchronized final TopologyBuilder setApplicationId(final String applicationId) {
internalTopologyBuilder.setApplicationId(applicationId);
return this;
@@ -140,7 +146,11 @@ public class TopologyBuilder {
*/
public synchronized final TopologyBuilder addSource(final String name,
final String... topics) {
- internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+ try {
+ internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -160,7 +170,11 @@ public class TopologyBuilder {
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,
final String... topics) {
- internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topics);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -178,8 +192,13 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
- final String name, final String... topics) {
- internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+ final String name,
+ final String... topics) {
+ try {
+ internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -199,8 +218,14 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor, final String name, final String... topics) {
- internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topics);
+ final TimestampExtractor timestampExtractor,
+ final String name,
+ final String... topics) {
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -219,7 +244,11 @@ public class TopologyBuilder {
*/
public synchronized final TopologyBuilder addSource(final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -240,7 +269,11 @@ public class TopologyBuilder {
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -262,11 +295,14 @@ public class TopologyBuilder {
public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
-
/**
* Add a new source that consumes from topics matching the given pattern
* and forward the records to child processor and/or sink nodes.
@@ -287,11 +323,14 @@ public class TopologyBuilder {
final TimestampExtractor timestampExtractor,
final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
-
/**
* Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
* The source will use the specified key and value deserializers.
@@ -307,12 +346,15 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
*/
-
public synchronized final TopologyBuilder addSource(final String name,
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final String... topics) {
- internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+ try {
+ internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -334,14 +376,17 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
*/
-
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final String... topics) {
- internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topics);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -374,7 +419,11 @@ public class TopologyBuilder {
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
- internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+ try {
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -409,7 +458,11 @@ public class TopologyBuilder {
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
- internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+ try {
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -435,7 +488,11 @@ public class TopologyBuilder {
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -466,7 +523,11 @@ public class TopologyBuilder {
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -494,11 +555,14 @@ public class TopologyBuilder {
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valDeserializer, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, keyDeserializer, valDeserializer, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
-
/**
* Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
* The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
@@ -517,7 +581,11 @@ public class TopologyBuilder {
public synchronized final TopologyBuilder addSink(final String name,
final String topic,
final String... predecessorNames) {
- internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
+ try {
+ internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -548,7 +616,11 @@ public class TopologyBuilder {
final String topic,
final StreamPartitioner partitioner,
final String... predecessorNames) {
- internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
+ try {
+ internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -576,7 +648,11 @@ public class TopologyBuilder {
final Serializer keySerializer,
final Serializer valSerializer,
final String... predecessorNames) {
- internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
+ try {
+ internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -607,7 +683,11 @@ public class TopologyBuilder {
final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ? super V> partitioner,
final String... predecessorNames) {
- internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
+ try {
+ internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -624,7 +704,11 @@ public class TopologyBuilder {
public synchronized final TopologyBuilder addProcessor(final String name,
final ProcessorSupplier supplier,
final String... predecessorNames) {
- internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
+ try {
+ internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -637,7 +721,11 @@ public class TopologyBuilder {
*/
public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
final String... processorNames) {
- internalTopologyBuilder.addStateStore(supplier, processorNames);
+ try {
+ internalTopologyBuilder.addStateStore(supplier, processorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -650,7 +738,13 @@ public class TopologyBuilder {
*/
public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName,
final String... stateStoreNames) {
- internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
+ if (stateStoreNames != null && stateStoreNames.length > 0) {
+ try {
+ internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
+ }
return this;
}
@@ -660,10 +754,7 @@ public class TopologyBuilder {
*
* NOTE this function would not needed by developers working with the processor APIs, but only used
* for the high-level DSL parsing functionalities.
- *
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName,
final String topic) {
internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, topic);
@@ -679,9 +770,7 @@ public class TopologyBuilder {
* @param processorNames the name of the processors
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized final TopologyBuilder connectProcessors(final String... processorNames) {
internalTopologyBuilder.connectProcessors(processorNames);
return this;
@@ -695,9 +784,7 @@ public class TopologyBuilder {
*
* @param topicName the name of the topic
* @return this builder instance so methods can be chained together; never null
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized final TopologyBuilder addInternalTopic(final String topicName) {
internalTopologyBuilder.addInternalTopic(topicName);
return this;
@@ -711,9 +798,7 @@ public class TopologyBuilder {
*
* @param sourceNodes a set of source node names
* @return this builder instance so methods can be chained together; never null
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized final TopologyBuilder copartitionSources(final Collection<String> sourceNodes) {
internalTopologyBuilder.copartitionSources(sourceNodes);
return this;
@@ -726,9 +811,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return groups of node names
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Map<Integer, Set<String>> nodeGroups() {
return internalTopologyBuilder.nodeGroups();
}
@@ -741,9 +824,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized ProcessorTopology build(final Integer topicGroupId) {
return internalTopologyBuilder.build(topicGroupId);
}
@@ -755,9 +836,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return ProcessorTopology
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized ProcessorTopology buildGlobalStateTopology() {
return internalTopologyBuilder.buildGlobalStateTopology();
}
@@ -770,9 +849,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return map containing all global {@link StateStore}s
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public Map<String, StateStore> globalStateStores() {
return internalTopologyBuilder.globalStateStores();
}
@@ -785,11 +862,22 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return groups of topic names
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Map<Integer, TopicsInfo> topicGroups() {
- return internalTopologyBuilder.topicGroups();
+ final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroupsWithNewTopicsInfo = internalTopologyBuilder.topicGroups();
+ final Map<Integer, TopicsInfo> topicGroupsWithDeprecatedTopicInfo = new HashMap<>();
+
+ for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroupsWithNewTopicsInfo.entrySet()) {
+ final InternalTopologyBuilder.TopicsInfo newTopicsInfo = entry.getValue();
+
+ topicGroupsWithDeprecatedTopicInfo.put(entry.getKey(), new TopicsInfo(
+ newTopicsInfo.sinkTopics,
+ newTopicsInfo.sourceTopics,
+ newTopicsInfo.repartitionSourceTopics,
+ newTopicsInfo.stateChangelogTopics));
+ }
+
+ return topicGroupsWithDeprecatedTopicInfo;
}
/**
@@ -799,9 +887,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return the Pattern for matching all topics reading from earliest offset, never null
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Pattern earliestResetTopicsPattern() {
return internalTopologyBuilder.earliestResetTopicsPattern();
}
@@ -813,9 +899,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return the Pattern for matching all topics reading from latest offset, never null
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Pattern latestResetTopicsPattern() {
return internalTopologyBuilder.latestResetTopicsPattern();
}
@@ -825,9 +909,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return a mapping from state store name to a Set of source Topics.
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public Map<String, List<String>> stateStoreNameToSourceTopics() {
return internalTopologyBuilder.stateStoreNameToSourceTopics();
}
@@ -840,9 +922,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return groups of topic names
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Collection<Set<String>> copartitionGroups() {
return internalTopologyBuilder.copartitionGroups();
}
@@ -850,10 +930,7 @@ public class TopologyBuilder {
/**
* NOTE this function would not needed by developers working with the processor APIs, but only used
* for the high-level DSL parsing functionalities.
- *
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public SubscriptionUpdates subscriptionUpdates() {
return internalTopologyBuilder.subscriptionUpdates();
}
@@ -861,10 +938,7 @@ public class TopologyBuilder {
/**
* NOTE this function would not needed by developers working with the processor APIs, but only used
* for the high-level DSL parsing functionalities.
- *
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Pattern sourceTopicPattern() {
return internalTopologyBuilder.sourceTopicPattern();
}
@@ -872,10 +946,7 @@ public class TopologyBuilder {
/**
* NOTE this function would not needed by developers working with the processor APIs, but only used
* for the high-level DSL parsing functionalities.
- *
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
final String threadId) {
internalTopologyBuilder.updateSubscriptions(subscriptionUpdates, threadId);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 4c1d350..7925b14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -38,9 +37,6 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
super(new TaskId(-1, -1), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), config, metrics, stateMgr, cache);
}
- /**
- * @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
- */
@Override
public StateStore getStateStore(final String name) {
return stateManager.getGlobalStore(name);
@@ -95,6 +91,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
/**
* @throws UnsupportedOperationException on every invocation
*/
+ @SuppressWarnings("deprecation")
@Override
public void schedule(long interval) {
throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index ff65d31..0d5cd48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -19,13 +19,13 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
@@ -207,7 +207,7 @@ public class InternalTopologyBuilder {
} else if (topicToPatterns.containsKey(update) && isMatch(update)) {
// the same topic cannot be matched to more than one pattern
// TODO: we should lift this requirement in the future
- throw new TopologyBuilderException("Topic " + update +
+ throw new TopologyException("Topic " + update +
" is already matched for another regex pattern " + topicToPatterns.get(update) +
" and hence cannot be matched to this regex pattern " + pattern + " any more.");
} else if (isMatch(update)) {
@@ -293,18 +293,18 @@ public class InternalTopologyBuilder {
return this;
}
- public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+ public final void addSource(final Topology.AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final String... topics) {
if (topics.length == 0) {
- throw new TopologyBuilderException("You must provide at least one topic");
+ throw new TopologyException("You must provide at least one topic");
}
Objects.requireNonNull(name, "name must not be null");
if (nodeFactories.containsKey(name)) {
- throw new TopologyBuilderException("Processor " + name + " is already added.");
+ throw new TopologyException("Processor " + name + " is already added.");
}
for (final String topic : topics) {
@@ -319,67 +319,7 @@ public class InternalTopologyBuilder {
nodeGrouper.add(name);
}
- public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
- final String sourceName,
- final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
- final String topic,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
- Objects.requireNonNull(storeSupplier, "store supplier must not be null");
- Objects.requireNonNull(sourceName, "sourceName must not be null");
- Objects.requireNonNull(topic, "topic must not be null");
- Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
- Objects.requireNonNull(processorName, "processorName must not be null");
- if (nodeFactories.containsKey(sourceName)) {
- throw new TopologyBuilderException("Processor " + sourceName + " is already added.");
- }
- if (nodeFactories.containsKey(processorName)) {
- throw new TopologyBuilderException("Processor " + processorName + " is already added.");
- }
- if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
- throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added.");
- }
- if (storeSupplier.loggingEnabled()) {
- throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
- }
- if (sourceName.equals(processorName)) {
- throw new TopologyBuilderException("sourceName and processorName must be different.");
- }
-
- validateTopicNotAlreadyRegistered(topic);
-
- globalTopics.add(topic);
- final String[] topics = {topic};
- nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
- nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
- nodeGrouper.add(sourceName);
-
- final String[] predecessors = {sourceName};
- final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
- nodeFactory.addStateStore(storeSupplier.name());
- nodeFactories.put(processorName, nodeFactory);
- nodeGrouper.add(processorName);
- nodeGrouper.unite(processorName, predecessors);
-
- globalStateStores.put(storeSupplier.name(), storeSupplier.get());
- connectSourceStoreAndTopic(storeSupplier.name(), topic);
- }
-
- private void validateTopicNotAlreadyRegistered(final String topic) {
- if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
- throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
- }
-
- for (final Pattern pattern : nodeToSourcePatterns.values()) {
- if (pattern.matcher(topic).matches()) {
- throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
- }
- }
- }
-
- public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+ public final void addSource(final Topology.AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
@@ -389,12 +329,12 @@ public class InternalTopologyBuilder {
Objects.requireNonNull(name, "name can't be null");
if (nodeFactories.containsKey(name)) {
- throw new TopologyBuilderException("Processor " + name + " is already added.");
+ throw new TopologyException("Processor " + name + " is already added.");
}
for (final String sourceTopicName : sourceTopicNames) {
if (topicPattern.matcher(sourceTopicName).matches()) {
- throw new TopologyBuilderException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
+ throw new TopologyException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
}
}
@@ -414,15 +354,18 @@ public class InternalTopologyBuilder {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(topic, "topic must not be null");
if (nodeFactories.containsKey(name)) {
- throw new TopologyBuilderException("Processor " + name + " is already added.");
+ throw new TopologyException("Processor " + name + " is already added.");
}
for (final String predecessor : predecessorNames) {
if (predecessor.equals(name)) {
- throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+ throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
}
if (!nodeFactories.containsKey(predecessor)) {
- throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+ throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
+ }
+ if (nodeToSinkTopic.containsKey(predecessor)) {
+ throw new TopologyException("Sink " + predecessor + " cannot be used a parent.");
}
}
@@ -438,15 +381,15 @@ public class InternalTopologyBuilder {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(supplier, "supplier must not be null");
if (nodeFactories.containsKey(name)) {
- throw new TopologyBuilderException("Processor " + name + " is already added.");
+ throw new TopologyException("Processor " + name + " is already added.");
}
for (final String predecessor : predecessorNames) {
if (predecessor.equals(name)) {
- throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+ throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
}
if (!nodeFactories.containsKey(predecessor)) {
- throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+ throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
}
}
@@ -459,7 +402,7 @@ public class InternalTopologyBuilder {
final String... processorNames) {
Objects.requireNonNull(supplier, "supplier can't be null");
if (stateFactories.containsKey(supplier.name())) {
- throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
+ throw new TopologyException("StateStore " + supplier.name() + " is already added.");
}
stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
@@ -471,43 +414,109 @@ public class InternalTopologyBuilder {
}
}
+ public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final String sourceName,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier) {
+ Objects.requireNonNull(storeSupplier, "store supplier must not be null");
+ Objects.requireNonNull(sourceName, "sourceName must not be null");
+ Objects.requireNonNull(topic, "topic must not be null");
+ Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
+ Objects.requireNonNull(processorName, "processorName must not be null");
+ if (nodeFactories.containsKey(sourceName)) {
+ throw new TopologyException("Processor " + sourceName + " is already added.");
+ }
+ if (nodeFactories.containsKey(processorName)) {
+ throw new TopologyException("Processor " + processorName + " is already added.");
+ }
+ if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
+ throw new TopologyException("StateStore " + storeSupplier.name() + " is already added.");
+ }
+ if (storeSupplier.loggingEnabled()) {
+ throw new TopologyException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
+ }
+ if (sourceName.equals(processorName)) {
+ throw new TopologyException("sourceName and processorName must be different.");
+ }
+
+ validateTopicNotAlreadyRegistered(topic);
+
+ globalTopics.add(topic);
+ final String[] topics = {topic};
+ nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
+ nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
+ nodeGrouper.add(sourceName);
+
+ final String[] predecessors = {sourceName};
+ final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
+ nodeFactory.addStateStore(storeSupplier.name());
+ nodeFactories.put(processorName, nodeFactory);
+ nodeGrouper.add(processorName);
+ nodeGrouper.unite(processorName, predecessors);
+
+ globalStateStores.put(storeSupplier.name(), storeSupplier.get());
+ connectSourceStoreAndTopic(storeSupplier.name(), topic);
+ }
+
+ private void validateTopicNotAlreadyRegistered(final String topic) {
+ if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
+ throw new TopologyException("Topic " + topic + " has already been registered by another source.");
+ }
+
+ for (final Pattern pattern : nodeToSourcePatterns.values()) {
+ if (pattern.matcher(topic).matches()) {
+ throw new TopologyException("Topic " + topic + " matches a Pattern already registered by another source.");
+ }
+ }
+ }
+
public final void connectProcessorAndStateStores(final String processorName,
final String... stateStoreNames) {
Objects.requireNonNull(processorName, "processorName can't be null");
- if (stateStoreNames != null) {
- for (final String stateStoreName : stateStoreNames) {
- connectProcessorAndStateStore(processorName, stateStoreName);
- }
+ Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be null");
+ if (stateStoreNames.length == 0) {
+ throw new TopologyException("Must provide at least one state store name.");
+ }
+ for (final String stateStoreName : stateStoreNames) {
+ connectProcessorAndStateStore(processorName, stateStoreName);
}
}
+ // TODO: this method is only used by DSL and we might want to refactor this part
public final void connectSourceStoreAndTopic(final String sourceStoreName,
final String topic) {
if (storeToChangelogTopic.containsKey(sourceStoreName)) {
- throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
+ throw new TopologyException("Source store " + sourceStoreName + " is already added.");
}
storeToChangelogTopic.put(sourceStoreName, topic);
}
+ // TODO: this method is only used by DSL and we might want to refactor this part
public final void connectProcessors(final String... processorNames) {
if (processorNames.length < 2) {
- throw new TopologyBuilderException("At least two processors need to participate in the connection.");
+ throw new TopologyException("At least two processors need to participate in the connection.");
}
for (final String processorName : processorNames) {
if (!nodeFactories.containsKey(processorName)) {
- throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+ throw new TopologyException("Processor " + processorName + " is not added yet.");
}
}
nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length));
}
+ // TODO: this method is only used by DSL and we might want to refactor this part
public final void addInternalTopic(final String topicName) {
Objects.requireNonNull(topicName, "topicName can't be null");
internalTopicNames.add(topicName);
}
+ // TODO: this method is only used by DSL and we might want to refactor this part
public final void copartitionSources(final Collection<String> sourceNodes) {
copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
}
@@ -515,10 +524,10 @@ public class InternalTopologyBuilder {
private void connectProcessorAndStateStore(final String processorName,
final String stateStoreName) {
if (!stateFactories.containsKey(stateStoreName)) {
- throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
+ throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
}
if (!nodeFactories.containsKey(processorName)) {
- throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+ throw new TopologyException("Processor " + processorName + " is not added yet.");
}
final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
@@ -535,7 +544,7 @@ public class InternalTopologyBuilder {
processorNodeFactory.addStateStore(stateStoreName);
connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
} else {
- throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+ throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
}
}
@@ -588,7 +597,7 @@ public class InternalTopologyBuilder {
private <T> void maybeAddToResetList(final Collection<T> earliestResets,
final Collection<T> latestResets,
- final TopologyBuilder.AutoOffsetReset offsetReset,
+ final Topology.AutoOffsetReset offsetReset,
final T item) {
if (offsetReset != null) {
switch (offsetReset) {
@@ -599,7 +608,7 @@ public class InternalTopologyBuilder {
latestResets.add(item);
break;
default:
- throw new TopologyBuilderException(String.format("Unrecognized reset format %s", offsetReset));
+ throw new TopologyException(String.format("Unrecognized reset format %s", offsetReset));
}
}
}
@@ -759,7 +768,7 @@ public class InternalTopologyBuilder {
}
}
} else {
- throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
+ throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
}
}
}
@@ -782,8 +791,8 @@ public class InternalTopologyBuilder {
*
* @return groups of topic names
*/
- public synchronized Map<Integer, TopologyBuilder.TopicsInfo> topicGroups() {
- final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = new LinkedHashMap<>();
+ public synchronized Map<Integer, TopicsInfo> topicGroups() {
+ final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
if (nodeGroups == null) {
nodeGroups = makeNodeGroups();
@@ -839,7 +848,7 @@ public class InternalTopologyBuilder {
}
}
if (!sourceTopics.isEmpty()) {
- topicGroups.put(entry.getKey(), new TopologyBuilder.TopicsInfo(
+ topicGroups.put(entry.getKey(), new TopicsInfo(
Collections.unmodifiableSet(sinkTopics),
Collections.unmodifiableSet(sourceTopics),
Collections.unmodifiableMap(internalSourceTopics),
@@ -921,7 +930,7 @@ public class InternalTopologyBuilder {
final Set<String> otherTopics) {
for (final Pattern otherPattern : otherPatterns) {
if (builtPattern.pattern().contains(otherPattern.pattern())) {
- throw new TopologyBuilderException(
+ throw new TopologyException(
String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets",
otherPattern.pattern(),
builtPattern.pattern()));
@@ -930,7 +939,7 @@ public class InternalTopologyBuilder {
for (final String otherTopic : otherTopics) {
if (builtPattern.matcher(otherTopic).matches()) {
- throw new TopologyBuilderException(
+ throw new TopologyException(
String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets",
builtPattern.pattern(),
otherTopic));
@@ -995,7 +1004,7 @@ public class InternalTopologyBuilder {
private String decorateTopic(final String topic) {
if (applicationId == null) {
- throw new TopologyBuilderException("there are internal topics and "
+ throw new TopologyException("there are internal topics and "
+ "applicationId hasn't been set. Call "
+ "setApplicationId first");
}
@@ -1397,6 +1406,49 @@ public class InternalTopologyBuilder {
}
}
+ public static class TopicsInfo {
+ public Set<String> sinkTopics;
+ public Set<String> sourceTopics;
+ public Map<String, InternalTopicConfig> stateChangelogTopics;
+ public Map<String, InternalTopicConfig> repartitionSourceTopics;
+
+ TopicsInfo(final Set<String> sinkTopics,
+ final Set<String> sourceTopics,
+ final Map<String, InternalTopicConfig> repartitionSourceTopics,
+ final Map<String, InternalTopicConfig> stateChangelogTopics) {
+ this.sinkTopics = sinkTopics;
+ this.sourceTopics = sourceTopics;
+ this.stateChangelogTopics = stateChangelogTopics;
+ this.repartitionSourceTopics = repartitionSourceTopics;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (o instanceof TopicsInfo) {
+ final TopicsInfo other = (TopicsInfo) o;
+ return other.sourceTopics.equals(sourceTopics) && other.stateChangelogTopics.equals(stateChangelogTopics);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
+ return (int) (n % 0xFFFFFFFFL);
+ }
+
+ @Override
+ public String toString() {
+ return "TopicsInfo{" +
+ "sinkTopics=" + sinkTopics +
+ ", sourceTopics=" + sourceTopics +
+ ", repartitionSourceTopics=" + repartitionSourceTopics +
+ ", stateChangelogTopics=" + stateChangelogTopics +
+ '}';
+ }
+ }
+
public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription {
private final Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies = new HashSet<>();
private final Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 79c38b0..eb2a171 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -57,6 +57,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
/**
* @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
*/
+ @SuppressWarnings("deprecation")
@Override
public StateStore getStateStore(final String name) {
if (currentNode() == null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
index e26d110..1d9e722 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
@@ -35,6 +35,7 @@ class SourceNodeRecordDeserializer implements RecordDeserializer {
this.deserializationExceptionHandler = deserializationExceptionHandler;
}
+ @SuppressWarnings("deprecation")
@Override
public ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord) {
final Object key;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index e8b6a1a..f9ae216 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
@@ -331,10 +330,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// parse the topology to determine the repartition source topics,
// making sure they are created with the number of partitions as
// the maximum of the depending sub-topologies source topics' number of partitions
- Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
+ Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
- for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+ for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
}
@@ -344,13 +343,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
do {
numPartitionsNeeded = false;
- for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+ for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
// try set the number of partitions for this repartition topic if it is not set yet
if (numPartitions == UNKNOWN) {
- for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
+ for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
if (otherSinkTopics.contains(topicName)) {
@@ -418,7 +417,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// get the tasks as partition groups from the partition grouper
Set<String> allSourceTopics = new HashSet<>();
Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
allSourceTopics.addAll(entry.getValue().sourceTopics);
sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
}
@@ -462,7 +461,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// add tasks to state change log topic subscribers
Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
final int topicGroupId = entry.getKey();
final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
@@ -646,6 +645,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
*
* @param topicPartitions Map that contains the topic names to be created with the number of partitions
*/
+ @SuppressWarnings("deprecation")
private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) {
log.debug("{} Starting to validate internal topics in partition assignor.", logPrefix);
@@ -775,6 +775,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
this.logPrefix = String.format("stream-thread [%s]", threadName);
}
+ @SuppressWarnings("deprecation")
void validate(final Set<String> copartitionGroup,
final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
final Cluster metadata) {