You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/24 18:03:34 UTC
[5/5] kafka git commit: KAFKA-3856 (KIP-120) step two: extract
internal functions from public facing TopologyBuilder class
KAFKA-3856 (KIP-120) step two: extract internal functions from public facing TopologyBuilder class
- extract InternalTopologyBuilder from TopologyBuilder
- deprecate all "leaking" methods from public TopologyBuilder API
- changed TopologyDescription and all nested classed into interfaces
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Eno Thereska <en...@gmail.com>, Bill Bejeck <bb...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #3536 from mjsax/kafka-3856-extract-internal-topology-builder
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d798511
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d798511
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d798511
Branch: refs/heads/trunk
Commit: 5d798511b12c5ef7555e4234fdd99a360176e435
Parents: fc93fb4
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Jul 24 11:03:27 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jul 24 11:03:27 2017 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 4 +-
.../kafka/streams/TopologyDescription.java | 152 ++
.../streams/processor/TopologyBuilder.java | 1265 +++------------
.../streams/processor/TopologyDescription.java | 476 ------
.../internals/InternalTopologyBuilder.java | 1491 ++++++++++++++++++
.../internals/StreamPartitionAssignor.java | 9 +-
.../processor/internals/StreamThread.java | 7 +-
.../internals/StreamsMetadataState.java | 5 +-
.../apache/kafka/streams/KafkaStreamsTest.java | 31 +-
.../integration/RegexSourceIntegrationTest.java | 49 +-
.../streams/processor/TopologyBuilderTest.java | 10 +-
.../kafka/streams/processor/TopologyTest.java | 87 +-
.../internals/InternalTopologyBuilderTest.java | 709 +++++++++
.../internals/ProcessorTopologyTest.java | 34 +-
.../internals/StreamPartitionAssignorTest.java | 289 +++-
.../processor/internals/StreamThreadTest.java | 151 +-
.../internals/StreamsMetadataStateTest.java | 8 +-
.../StreamThreadStateStoreProviderTest.java | 4 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 7 +-
19 files changed, 3016 insertions(+), 1772 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 028713b..c7c67d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -452,7 +452,7 @@ public class KafkaStreams {
GlobalStreamThread.State globalThreadState = null;
final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
- streamsMetadataState = new StreamsMetadataState(builder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
+ streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
final ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology();
@@ -476,7 +476,7 @@ public class KafkaStreams {
}
for (int i = 0; i < threads.length; i++) {
- threads[i] = new StreamThread(builder,
+ threads[i] = new StreamThread(builder.internalTopologyBuilder,
config,
clientSupplier,
applicationId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
new file mode 100644
index 0000000..dd481ff
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.processor.internals.StreamTask;
+
+import java.util.Set;
+
+/**
+ * A meta representation of a {@link Topology topology}.
+ * <p>
+ * The nodes of a topology are grouped into {@link Subtopology sub-topologies} if they are connected.
+ * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one
+ * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology
+ * {@link Topology#addSource(String, String...) reads} from the same topic.
+ * <p>
+ * When {@link KafkaStreams#start()} is called, different sub-topologies will be constructed and executed as independent
+ * {@link StreamTask tasks}.
+ */
+public interface TopologyDescription {
+ /**
+ * A connected sub-graph of a {@link Topology}.
+ * <p>
+ * Nodes of a {@code Subtopology} are connected {@link Topology#addProcessor(String, ProcessorSupplier, String...)
+ * directly} or indirectly via {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
+ * (i.e., if multiple processors share the same state).
+ */
+ interface Subtopology {
+ /**
+ * Internally assigned unique ID.
+ * @return the ID of the sub-topology
+ */
+ int id();
+
+ /**
+ * All nodes of this sub-topology.
+ * @return set of all nodes within the sub-topology
+ */
+ Set<Node> nodes();
+ }
+
+ /**
+ * Represents a {@link Topology#addGlobalStore(StateStoreSupplier, String,
+ * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
+ * String, ProcessorSupplier)} global store}.
+ * Adding a global store results in adding a source node and one stateful processor node.
+ * Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different
+ * global stores are not connected to each other.
+ * Furthermore, global stores are available to all processors without connecting them explicitly, and thus global
+ * stores will never be part of any {@link Subtopology}.
+ */
+ interface GlobalStore {
+ /**
+ * The source node reading from a "global" topic.
+ * @return the "global" source node
+ */
+ Source source();
+
+ /**
+ * The processor node maintaining the global store.
+ * @return the "global" processor node
+ */
+ Processor processor();
+ }
+
+ /**
+ * A node of a topology. Can be a source, sink, or processor node.
+ */
+ interface Node {
+ /**
+ * The name of the node. Will never be {@code null}.
+ * @return the name of the node
+ */
+ String name();
+ /**
+ * The predecessors of this node within a sub-topology.
+ * Note, sources do not have any predecessors.
+ * Will never be {@code null}.
+ * @return set of all predecessors
+ */
+ Set<Node> predecessors();
+ /**
+ * The successor of this node within a sub-topology.
+ * Note, sinks do not have any successors.
+ * Will never be {@code null}.
+ * @return set of all successor
+ */
+ Set<Node> successors();
+ }
+
+
+ /**
+ * A source node of a topology.
+ */
+ interface Source extends Node {
+ /**
+ * The topic names this source node is reading from.
+ * @return comma separated list of topic names or pattern (as String)
+ */
+ String topics();
+ }
+
+ /**
+ * A processor node of a topology.
+ */
+ interface Processor extends Node {
+ /**
+ * The names of all connected stores.
+ * @return set of store names
+ */
+ Set<String> stores();
+ }
+
+ /**
+ * A sink node of a topology.
+ */
+ interface Sink extends Node {
+ /**
+ * The topic name this sink node is writing to.
+ * @return a topic name
+ */
+ String topic();
+ }
+
+ /**
+ * All sub-topologies of the represented topology.
+ * @return set of all sub-topologies
+ */
+ Set<Subtopology> subtopologies();
+
+ /**
+ * All global stores of the represented topology.
+ * @return set of all global stores
+ */
+ Set<GlobalStore> globalStores();
+
+}
+