You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/28 23:46:39 UTC

[2/3] kafka git commit: KAFKA-5670: (KIP-120) Add Topology and deprecate TopologyBuilder

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ce6ba7b..e6c0d6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -16,11 +16,12 @@
  */
 package org.apache.kafka.streams.processor;
 
-import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
@@ -33,6 +34,7 @@ import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.Subs
 import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,24 +48,29 @@ import java.util.regex.Pattern;
  * is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. This builder allows you
  * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
  * instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}.
+ *
+ * @deprecated use {@link Topology} instead
  */
-@InterfaceStability.Evolving
+@Deprecated
 public class TopologyBuilder {
 
     /**
      * NOTE this member would not needed by developers working with the processor APIs, but only used
      * for internal functionalities.
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
 
+    private Topology.AutoOffsetReset translateAutoOffsetReset(final TopologyBuilder.AutoOffsetReset resetPolicy) {
+        if (resetPolicy == null) {
+            return null;
+        }
+        return resetPolicy == TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
+    }
+
     /**
      * NOTE this class would not needed by developers working with the processor APIs, but only used
      * for internal functionalities.
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public static class TopicsInfo {
         public Set<String> sinkTopics;
         public Set<String> sourceTopics;
@@ -108,7 +115,7 @@ public class TopologyBuilder {
     }
 
     /**
-     * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}
+     * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}.
      */
     public enum AutoOffsetReset {
         EARLIEST, LATEST
@@ -119,8 +126,7 @@ public class TopologyBuilder {
      */
     public TopologyBuilder() {}
 
-    /** @deprecated This class is not part of public API and should never be used by a developer. */
-    @Deprecated
+    /** This class is not part of public API and should never be used by a developer. */
     public synchronized final TopologyBuilder setApplicationId(final String applicationId) {
         internalTopologyBuilder.setApplicationId(applicationId);
         return this;
@@ -140,7 +146,11 @@ public class TopologyBuilder {
      */
     public synchronized final TopologyBuilder addSource(final String name,
                                                         final String... topics) {
-        internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+        try {
+            internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -160,7 +170,11 @@ public class TopologyBuilder {
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                         final String name,
                                                         final String... topics) {
-        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topics);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -178,8 +192,13 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
-                                                        final String name, final String... topics) {
-        internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+                                                        final String name,
+                                                        final String... topics) {
+        try {
+            internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -199,8 +218,14 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
-                                                        final TimestampExtractor timestampExtractor, final String name, final String... topics) {
-        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topics);
+                                                        final TimestampExtractor timestampExtractor,
+                                                        final String name,
+                                                        final String... topics) {
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -219,7 +244,11 @@ public class TopologyBuilder {
      */
     public synchronized final TopologyBuilder addSource(final String name,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -240,7 +269,11 @@ public class TopologyBuilder {
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                         final String name,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -262,11 +295,14 @@ public class TopologyBuilder {
     public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
                                                         final String name,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
-
     /**
      * Add a new source that consumes from topics matching the given pattern
      * and forward the records to child processor and/or sink nodes.
@@ -287,11 +323,14 @@ public class TopologyBuilder {
                                                         final TimestampExtractor timestampExtractor,
                                                         final String name,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
-
     /**
      * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
      * The source will use the specified key and value deserializers.
@@ -307,12 +346,15 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
      */
-
     public synchronized final TopologyBuilder addSource(final String name,
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final String... topics) {
-        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+        try {
+            internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -334,14 +376,17 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
      */
-
     public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                         final String name,
                                                         final TimestampExtractor timestampExtractor,
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final String... topics) {
-        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topics);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topics);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -374,7 +419,11 @@ public class TopologyBuilder {
                                                        final String topic,
                                                        final String processorName,
                                                        final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+        try {
+            internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -409,7 +458,11 @@ public class TopologyBuilder {
                                                        final String topic,
                                                        final String processorName,
                                                        final ProcessorSupplier stateUpdateSupplier) {
-        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+        try {
+            internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -435,7 +488,11 @@ public class TopologyBuilder {
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -466,7 +523,11 @@ public class TopologyBuilder {
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -494,11 +555,14 @@ public class TopologyBuilder {
                                                         final Deserializer keyDeserializer,
                                                         final Deserializer valDeserializer,
                                                         final Pattern topicPattern) {
-        internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valDeserializer, topicPattern);
+        try {
+            internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, keyDeserializer, valDeserializer, topicPattern);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
-
     /**
      * Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
      * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
@@ -517,7 +581,11 @@ public class TopologyBuilder {
     public synchronized final TopologyBuilder addSink(final String name,
                                                       final String topic,
                                                       final String... predecessorNames) {
-        internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
+        try {
+            internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -548,7 +616,11 @@ public class TopologyBuilder {
                                                       final String topic,
                                                       final StreamPartitioner partitioner,
                                                       final String... predecessorNames) {
-        internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
+        try {
+            internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -576,7 +648,11 @@ public class TopologyBuilder {
                                                       final Serializer keySerializer,
                                                       final Serializer valSerializer,
                                                       final String... predecessorNames) {
-        internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
+        try {
+            internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -607,7 +683,11 @@ public class TopologyBuilder {
                                                              final Serializer<V> valSerializer,
                                                              final StreamPartitioner<? super K, ? super V> partitioner,
                                                              final String... predecessorNames) {
-        internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
+        try {
+            internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -624,7 +704,11 @@ public class TopologyBuilder {
     public synchronized final TopologyBuilder addProcessor(final String name,
                                                            final ProcessorSupplier supplier,
                                                            final String... predecessorNames) {
-        internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
+        try {
+            internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -637,7 +721,11 @@ public class TopologyBuilder {
      */
     public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
                                                             final String... processorNames) {
-        internalTopologyBuilder.addStateStore(supplier, processorNames);
+        try {
+            internalTopologyBuilder.addStateStore(supplier, processorNames);
+        } catch (final TopologyException e) {
+            throw new TopologyBuilderException(e);
+        }
         return this;
     }
 
@@ -650,7 +738,13 @@ public class TopologyBuilder {
      */
     public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName,
                                                                              final String... stateStoreNames) {
-        internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
+        if (stateStoreNames != null && stateStoreNames.length > 0) {
+            try {
+                internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
+            } catch (final TopologyException e) {
+                throw new TopologyBuilderException(e);
+            }
+        }
         return this;
     }
 
@@ -660,10 +754,7 @@ public class TopologyBuilder {
      *
      * NOTE this function would not needed by developers working with the processor APIs, but only used
      * for the high-level DSL parsing functionalities.
-     *
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName,
                                                                             final String topic) {
         internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, topic);
@@ -679,9 +770,7 @@ public class TopologyBuilder {
      * @param processorNames the name of the processors
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized final TopologyBuilder connectProcessors(final String... processorNames) {
         internalTopologyBuilder.connectProcessors(processorNames);
         return this;
@@ -695,9 +784,7 @@ public class TopologyBuilder {
      *
      * @param topicName the name of the topic
      * @return this builder instance so methods can be chained together; never null
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized final TopologyBuilder addInternalTopic(final String topicName) {
         internalTopologyBuilder.addInternalTopic(topicName);
         return this;
@@ -711,9 +798,7 @@ public class TopologyBuilder {
      *
      * @param sourceNodes a set of source node names
      * @return this builder instance so methods can be chained together; never null
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized final TopologyBuilder copartitionSources(final Collection<String> sourceNodes) {
         internalTopologyBuilder.copartitionSources(sourceNodes);
         return this;
@@ -726,9 +811,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return groups of node names
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Map<Integer, Set<String>> nodeGroups() {
         return internalTopologyBuilder.nodeGroups();
     }
@@ -741,9 +824,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized ProcessorTopology build(final Integer topicGroupId) {
         return internalTopologyBuilder.build(topicGroupId);
     }
@@ -755,9 +836,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return ProcessorTopology
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized ProcessorTopology buildGlobalStateTopology() {
         return internalTopologyBuilder.buildGlobalStateTopology();
     }
@@ -770,9 +849,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return map containing all global {@link StateStore}s
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public Map<String, StateStore> globalStateStores() {
         return internalTopologyBuilder.globalStateStores();
     }
@@ -785,11 +862,22 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return groups of topic names
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Map<Integer, TopicsInfo> topicGroups() {
-        return internalTopologyBuilder.topicGroups();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroupsWithNewTopicsInfo = internalTopologyBuilder.topicGroups();
+        final Map<Integer, TopicsInfo> topicGroupsWithDeprecatedTopicInfo = new HashMap<>();
+
+        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroupsWithNewTopicsInfo.entrySet()) {
+            final InternalTopologyBuilder.TopicsInfo newTopicsInfo = entry.getValue();
+
+            topicGroupsWithDeprecatedTopicInfo.put(entry.getKey(), new TopicsInfo(
+                newTopicsInfo.sinkTopics,
+                newTopicsInfo.sourceTopics,
+                newTopicsInfo.repartitionSourceTopics,
+                newTopicsInfo.stateChangelogTopics));
+        }
+
+        return topicGroupsWithDeprecatedTopicInfo;
     }
 
     /**
@@ -799,9 +887,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return the Pattern for matching all topics reading from earliest offset, never null
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Pattern earliestResetTopicsPattern() {
         return internalTopologyBuilder.earliestResetTopicsPattern();
     }
@@ -813,9 +899,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return the Pattern for matching all topics reading from latest offset, never null
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Pattern latestResetTopicsPattern() {
         return internalTopologyBuilder.latestResetTopicsPattern();
     }
@@ -825,9 +909,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return a mapping from state store name to a Set of source Topics.
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public Map<String, List<String>> stateStoreNameToSourceTopics() {
         return internalTopologyBuilder.stateStoreNameToSourceTopics();
     }
@@ -840,9 +922,7 @@ public class TopologyBuilder {
      * for the high-level DSL parsing functionalities.
      *
      * @return groups of topic names
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Collection<Set<String>> copartitionGroups() {
         return internalTopologyBuilder.copartitionGroups();
     }
@@ -850,10 +930,7 @@ public class TopologyBuilder {
     /**
      * NOTE this function would not needed by developers working with the processor APIs, but only used
      * for the high-level DSL parsing functionalities.
-     *
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public SubscriptionUpdates subscriptionUpdates() {
         return internalTopologyBuilder.subscriptionUpdates();
     }
@@ -861,10 +938,7 @@ public class TopologyBuilder {
     /**
      * NOTE this function would not needed by developers working with the processor APIs, but only used
      * for the high-level DSL parsing functionalities.
-     *
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized Pattern sourceTopicPattern() {
         return internalTopologyBuilder.sourceTopicPattern();
     }
@@ -872,10 +946,7 @@ public class TopologyBuilder {
     /**
      * NOTE this function would not needed by developers working with the processor APIs, but only used
      * for the high-level DSL parsing functionalities.
-     *
-     * @deprecated not part of public API and for internal usage only
      */
-    @Deprecated
     public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
                                                  final String threadId) {
         internalTopologyBuilder.updateSubscriptions(subscriptionUpdates, threadId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 4c1d350..7925b14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -38,9 +37,6 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
         super(new TaskId(-1, -1), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), config, metrics, stateMgr, cache);
     }
 
-    /**
-     * @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
-     */
     @Override
     public StateStore getStateStore(final String name) {
         return stateManager.getGlobalStore(name);
@@ -95,6 +91,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     /**
      * @throws UnsupportedOperationException on every invocation
      */
+    @SuppressWarnings("deprecation")
     @Override
     public void schedule(long interval) {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index ff65d31..0d5cd48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -19,13 +19,13 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
@@ -207,7 +207,7 @@ public class InternalTopologyBuilder {
                 } else if (topicToPatterns.containsKey(update) && isMatch(update)) {
                     // the same topic cannot be matched to more than one pattern
                     // TODO: we should lift this requirement in the future
-                    throw new TopologyBuilderException("Topic " + update +
+                    throw new TopologyException("Topic " + update +
                         " is already matched for another regex pattern " + topicToPatterns.get(update) +
                         " and hence cannot be matched to this regex pattern " + pattern + " any more.");
                 } else if (isMatch(update)) {
@@ -293,18 +293,18 @@ public class InternalTopologyBuilder {
         return this;
     }
 
-    public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+    public final void addSource(final Topology.AutoOffsetReset offsetReset,
                                 final String name,
                                 final TimestampExtractor timestampExtractor,
                                 final Deserializer keyDeserializer,
                                 final Deserializer valDeserializer,
                                 final String... topics) {
         if (topics.length == 0) {
-            throw new TopologyBuilderException("You must provide at least one topic");
+            throw new TopologyException("You must provide at least one topic");
         }
         Objects.requireNonNull(name, "name must not be null");
         if (nodeFactories.containsKey(name)) {
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
+            throw new TopologyException("Processor " + name + " is already added.");
         }
 
         for (final String topic : topics) {
@@ -319,67 +319,7 @@ public class InternalTopologyBuilder {
         nodeGrouper.add(name);
     }
 
-    public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
-                                     final String sourceName,
-                                     final TimestampExtractor timestampExtractor,
-                                     final Deserializer keyDeserializer,
-                                     final Deserializer valueDeserializer,
-                                     final String topic,
-                                     final String processorName,
-                                     final ProcessorSupplier stateUpdateSupplier) {
-        Objects.requireNonNull(storeSupplier, "store supplier must not be null");
-        Objects.requireNonNull(sourceName, "sourceName must not be null");
-        Objects.requireNonNull(topic, "topic must not be null");
-        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
-        Objects.requireNonNull(processorName, "processorName must not be null");
-        if (nodeFactories.containsKey(sourceName)) {
-            throw new TopologyBuilderException("Processor " + sourceName + " is already added.");
-        }
-        if (nodeFactories.containsKey(processorName)) {
-            throw new TopologyBuilderException("Processor " + processorName + " is already added.");
-        }
-        if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
-            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added.");
-        }
-        if (storeSupplier.loggingEnabled()) {
-            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
-        }
-        if (sourceName.equals(processorName)) {
-            throw new TopologyBuilderException("sourceName and processorName must be different.");
-        }
-
-        validateTopicNotAlreadyRegistered(topic);
-
-        globalTopics.add(topic);
-        final String[] topics = {topic};
-        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
-        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
-        nodeGrouper.add(sourceName);
-
-        final String[] predecessors = {sourceName};
-        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
-        nodeFactory.addStateStore(storeSupplier.name());
-        nodeFactories.put(processorName, nodeFactory);
-        nodeGrouper.add(processorName);
-        nodeGrouper.unite(processorName, predecessors);
-
-        globalStateStores.put(storeSupplier.name(), storeSupplier.get());
-        connectSourceStoreAndTopic(storeSupplier.name(), topic);
-    }
-
-    private void validateTopicNotAlreadyRegistered(final String topic) {
-        if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
-            throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
-        }
-
-        for (final Pattern pattern : nodeToSourcePatterns.values()) {
-            if (pattern.matcher(topic).matches()) {
-                throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
-            }
-        }
-    }
-
-    public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+    public final void addSource(final Topology.AutoOffsetReset offsetReset,
                                 final String name,
                                 final TimestampExtractor timestampExtractor,
                                 final Deserializer keyDeserializer,
@@ -389,12 +329,12 @@ public class InternalTopologyBuilder {
         Objects.requireNonNull(name, "name can't be null");
 
         if (nodeFactories.containsKey(name)) {
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
+            throw new TopologyException("Processor " + name + " is already added.");
         }
 
         for (final String sourceTopicName : sourceTopicNames) {
             if (topicPattern.matcher(sourceTopicName).matches()) {
-                throw new TopologyBuilderException("Pattern  " + topicPattern + " will match a topic that has already been registered by another source.");
+                throw new TopologyException("Pattern  " + topicPattern + " will match a topic that has already been registered by another source.");
             }
         }
 
@@ -414,15 +354,18 @@ public class InternalTopologyBuilder {
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(topic, "topic must not be null");
         if (nodeFactories.containsKey(name)) {
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
+            throw new TopologyException("Processor " + name + " is already added.");
         }
 
         for (final String predecessor : predecessorNames) {
             if (predecessor.equals(name)) {
-                throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+                throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
             }
             if (!nodeFactories.containsKey(predecessor)) {
-                throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
+            }
+            if (nodeToSinkTopic.containsKey(predecessor)) {
+                throw new TopologyException("Sink " + predecessor + " cannot be used a parent.");
             }
         }
 
@@ -438,15 +381,15 @@ public class InternalTopologyBuilder {
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(supplier, "supplier must not be null");
         if (nodeFactories.containsKey(name)) {
-            throw new TopologyBuilderException("Processor " + name + " is already added.");
+            throw new TopologyException("Processor " + name + " is already added.");
         }
 
         for (final String predecessor : predecessorNames) {
             if (predecessor.equals(name)) {
-                throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+                throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
             }
             if (!nodeFactories.containsKey(predecessor)) {
-                throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+                throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
             }
         }
 
@@ -459,7 +402,7 @@ public class InternalTopologyBuilder {
                                     final String... processorNames) {
         Objects.requireNonNull(supplier, "supplier can't be null");
         if (stateFactories.containsKey(supplier.name())) {
-            throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
+            throw new TopologyException("StateStore " + supplier.name() + " is already added.");
         }
 
         stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
@@ -471,43 +414,109 @@ public class InternalTopologyBuilder {
         }
     }
 
+    public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                     final String sourceName,
+                                     final TimestampExtractor timestampExtractor,
+                                     final Deserializer keyDeserializer,
+                                     final Deserializer valueDeserializer,
+                                     final String topic,
+                                     final String processorName,
+                                     final ProcessorSupplier stateUpdateSupplier) {
+        Objects.requireNonNull(storeSupplier, "store supplier must not be null");
+        Objects.requireNonNull(sourceName, "sourceName must not be null");
+        Objects.requireNonNull(topic, "topic must not be null");
+        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
+        Objects.requireNonNull(processorName, "processorName must not be null");
+        if (nodeFactories.containsKey(sourceName)) {
+            throw new TopologyException("Processor " + sourceName + " is already added.");
+        }
+        if (nodeFactories.containsKey(processorName)) {
+            throw new TopologyException("Processor " + processorName + " is already added.");
+        }
+        if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
+            throw new TopologyException("StateStore " + storeSupplier.name() + " is already added.");
+        }
+        if (storeSupplier.loggingEnabled()) {
+            throw new TopologyException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
+        }
+        if (sourceName.equals(processorName)) {
+            throw new TopologyException("sourceName and processorName must be different.");
+        }
+
+        validateTopicNotAlreadyRegistered(topic);
+
+        globalTopics.add(topic);
+        final String[] topics = {topic};
+        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
+        nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
+        nodeGrouper.add(sourceName);
+
+        final String[] predecessors = {sourceName};
+        final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
+        nodeFactory.addStateStore(storeSupplier.name());
+        nodeFactories.put(processorName, nodeFactory);
+        nodeGrouper.add(processorName);
+        nodeGrouper.unite(processorName, predecessors);
+
+        globalStateStores.put(storeSupplier.name(), storeSupplier.get());
+        connectSourceStoreAndTopic(storeSupplier.name(), topic);
+    }
+
+    private void validateTopicNotAlreadyRegistered(final String topic) {
+        if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
+            throw new TopologyException("Topic " + topic + " has already been registered by another source.");
+        }
+
+        for (final Pattern pattern : nodeToSourcePatterns.values()) {
+            if (pattern.matcher(topic).matches()) {
+                throw new TopologyException("Topic " + topic + " matches a Pattern already registered by another source.");
+            }
+        }
+    }
+
     public final void connectProcessorAndStateStores(final String processorName,
                                                      final String... stateStoreNames) {
         Objects.requireNonNull(processorName, "processorName can't be null");
-        if (stateStoreNames != null) {
-            for (final String stateStoreName : stateStoreNames) {
-                connectProcessorAndStateStore(processorName, stateStoreName);
-            }
+        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be null");
+        if (stateStoreNames.length == 0) {
+            throw new TopologyException("Must provide at least one state store name.");
+        }
+        for (final String stateStoreName : stateStoreNames) {
+            connectProcessorAndStateStore(processorName, stateStoreName);
         }
     }
 
+    // TODO: this method is only used by DSL and we might want to refactor this part
     public final void connectSourceStoreAndTopic(final String sourceStoreName,
                                                   final String topic) {
         if (storeToChangelogTopic.containsKey(sourceStoreName)) {
-            throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
+            throw new TopologyException("Source store " + sourceStoreName + " is already added.");
         }
         storeToChangelogTopic.put(sourceStoreName, topic);
     }
 
+    // TODO: this method is only used by DSL and we might want to refactor this part
     public final void connectProcessors(final String... processorNames) {
         if (processorNames.length < 2) {
-            throw new TopologyBuilderException("At least two processors need to participate in the connection.");
+            throw new TopologyException("At least two processors need to participate in the connection.");
         }
 
         for (final String processorName : processorNames) {
             if (!nodeFactories.containsKey(processorName)) {
-                throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+                throw new TopologyException("Processor " + processorName + " is not added yet.");
             }
         }
 
         nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length));
     }
 
+    // TODO: this method is only used by DSL and we might want to refactor this part
     public final void addInternalTopic(final String topicName) {
         Objects.requireNonNull(topicName, "topicName can't be null");
         internalTopicNames.add(topicName);
     }
 
+    // TODO: this method is only used by DSL and we might want to refactor this part
     public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
@@ -515,10 +524,10 @@ public class InternalTopologyBuilder {
     private void connectProcessorAndStateStore(final String processorName,
                                                final String stateStoreName) {
         if (!stateFactories.containsKey(stateStoreName)) {
-            throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
+            throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
         }
         if (!nodeFactories.containsKey(processorName)) {
-            throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+            throw new TopologyException("Processor " + processorName + " is not added yet.");
         }
 
         final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
@@ -535,7 +544,7 @@ public class InternalTopologyBuilder {
             processorNodeFactory.addStateStore(stateStoreName);
             connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
         } else {
-            throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+            throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
         }
     }
 
@@ -588,7 +597,7 @@ public class InternalTopologyBuilder {
 
     private <T> void maybeAddToResetList(final Collection<T> earliestResets,
                                          final Collection<T> latestResets,
-                                         final TopologyBuilder.AutoOffsetReset offsetReset,
+                                         final Topology.AutoOffsetReset offsetReset,
                                          final T item) {
         if (offsetReset != null) {
             switch (offsetReset) {
@@ -599,7 +608,7 @@ public class InternalTopologyBuilder {
                     latestResets.add(item);
                     break;
                 default:
-                    throw new TopologyBuilderException(String.format("Unrecognized reset format %s", offsetReset));
+                    throw new TopologyException(String.format("Unrecognized reset format %s", offsetReset));
             }
         }
     }
@@ -759,7 +768,7 @@ public class InternalTopologyBuilder {
                         }
                     }
                 } else {
-                    throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
+                    throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
                 }
             }
         }
@@ -782,8 +791,8 @@ public class InternalTopologyBuilder {
      *
      * @return groups of topic names
      */
-    public synchronized Map<Integer, TopologyBuilder.TopicsInfo> topicGroups() {
-        final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = new LinkedHashMap<>();
+    public synchronized Map<Integer, TopicsInfo> topicGroups() {
+        final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
 
         if (nodeGroups == null) {
             nodeGroups = makeNodeGroups();
@@ -839,7 +848,7 @@ public class InternalTopologyBuilder {
                 }
             }
             if (!sourceTopics.isEmpty()) {
-                topicGroups.put(entry.getKey(), new TopologyBuilder.TopicsInfo(
+                topicGroups.put(entry.getKey(), new TopicsInfo(
                         Collections.unmodifiableSet(sinkTopics),
                         Collections.unmodifiableSet(sourceTopics),
                         Collections.unmodifiableMap(internalSourceTopics),
@@ -921,7 +930,7 @@ public class InternalTopologyBuilder {
                                       final Set<String> otherTopics) {
         for (final Pattern otherPattern : otherPatterns) {
             if (builtPattern.pattern().contains(otherPattern.pattern())) {
-                throw new TopologyBuilderException(
+                throw new TopologyException(
                     String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets",
                         otherPattern.pattern(),
                         builtPattern.pattern()));
@@ -930,7 +939,7 @@ public class InternalTopologyBuilder {
 
         for (final String otherTopic : otherTopics) {
             if (builtPattern.matcher(otherTopic).matches()) {
-                throw new TopologyBuilderException(
+                throw new TopologyException(
                     String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets",
                         builtPattern.pattern(),
                         otherTopic));
@@ -995,7 +1004,7 @@ public class InternalTopologyBuilder {
 
     private String decorateTopic(final String topic) {
         if (applicationId == null) {
-            throw new TopologyBuilderException("there are internal topics and "
+            throw new TopologyException("there are internal topics and "
                     + "applicationId hasn't been set. Call "
                     + "setApplicationId first");
         }
@@ -1397,6 +1406,49 @@ public class InternalTopologyBuilder {
         }
     }
 
+    public static class TopicsInfo {
+        public Set<String> sinkTopics;
+        public Set<String> sourceTopics;
+        public Map<String, InternalTopicConfig> stateChangelogTopics;
+        public Map<String, InternalTopicConfig> repartitionSourceTopics;
+
+        TopicsInfo(final Set<String> sinkTopics,
+                          final Set<String> sourceTopics,
+                          final Map<String, InternalTopicConfig> repartitionSourceTopics,
+                          final Map<String, InternalTopicConfig> stateChangelogTopics) {
+            this.sinkTopics = sinkTopics;
+            this.sourceTopics = sourceTopics;
+            this.stateChangelogTopics = stateChangelogTopics;
+            this.repartitionSourceTopics = repartitionSourceTopics;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (o instanceof TopicsInfo) {
+                final TopicsInfo other = (TopicsInfo) o;
+                return other.sourceTopics.equals(sourceTopics) && other.stateChangelogTopics.equals(stateChangelogTopics);
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public int hashCode() {
+            final long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
+            return (int) (n % 0xFFFFFFFFL);
+        }
+
+        @Override
+        public String toString() {
+            return "TopicsInfo{" +
+                "sinkTopics=" + sinkTopics +
+                ", sourceTopics=" + sourceTopics +
+                ", repartitionSourceTopics=" + repartitionSourceTopics +
+                ", stateChangelogTopics=" + stateChangelogTopics +
+                '}';
+        }
+    }
+
     public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription {
         private final Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies = new HashSet<>();
         private final Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 79c38b0..eb2a171 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -57,6 +57,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     /**
      * @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
      */
+    @SuppressWarnings("deprecation")
     @Override
     public StateStore getStateStore(final String name) {
         if (currentNode() == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
index e26d110..1d9e722 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
@@ -35,6 +35,7 @@ class SourceNodeRecordDeserializer implements RecordDeserializer {
         this.deserializationExceptionHandler = deserializationExceptionHandler;
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord) {
         final Object key;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index e8b6a1a..f9ae216 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
@@ -331,10 +330,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // parse the topology to determine the repartition source topics,
         // making sure they are created with the number of partitions as
         // the maximum of the depending sub-topologies source topics' number of partitions
-        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
+        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
 
         Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
-        for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+        for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
             for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
                 repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
             }
@@ -344,13 +343,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         do {
             numPartitionsNeeded = false;
 
-            for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+            for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
                 for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
                     int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
 
                     // try set the number of partitions for this repartition topic if it is not set yet
                     if (numPartitions == UNKNOWN) {
-                        for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
+                        for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
                             Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
 
                             if (otherSinkTopics.contains(topicName)) {
@@ -418,7 +417,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // get the tasks as partition groups from the partition grouper
         Set<String> allSourceTopics = new HashSet<>();
         Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
             allSourceTopics.addAll(entry.getValue().sourceTopics);
             sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
         }
@@ -462,7 +461,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         // add tasks to state change log topic subscribers
         Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
-        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
             final int topicGroupId = entry.getKey();
             final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
 
@@ -646,6 +645,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
      *
      * @param topicPartitions Map that contains the topic names to be created with the number of partitions
      */
+    @SuppressWarnings("deprecation")
     private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) {
         log.debug("{} Starting to validate internal topics in partition assignor.", logPrefix);
 
@@ -775,6 +775,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             this.logPrefix = String.format("stream-thread [%s]", threadName);
         }
 
+        @SuppressWarnings("deprecation")
         void validate(final Set<String> copartitionGroup,
                       final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
                       final Cluster metadata) {