You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/28 23:46:39 UTC
[2/3] kafka git commit: KAFKA-5670: (KIP-120) Add Topology and
deprecate TopologyBuilder
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ce6ba7b..e6c0d6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -16,11 +16,12 @@
*/
package org.apache.kafka.streams.processor;
-import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
@@ -33,6 +34,7 @@ import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.Subs
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -46,24 +48,29 @@ import java.util.regex.Pattern;
* is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic. This builder allows you
* to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreams}
* instance that will then {@link org.apache.kafka.streams.KafkaStreams#start() begin consuming, processing, and producing records}.
+ *
+ * @deprecated use {@link Topology} instead
*/
-@InterfaceStability.Evolving
+@Deprecated
public class TopologyBuilder {
/**
* NOTE this member would not needed by developers working with the processor APIs, but only used
* for internal functionalities.
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
+ private Topology.AutoOffsetReset translateAutoOffsetReset(final TopologyBuilder.AutoOffsetReset resetPolicy) {
+ if (resetPolicy == null) {
+ return null;
+ }
+ return resetPolicy == TopologyBuilder.AutoOffsetReset.EARLIEST ? Topology.AutoOffsetReset.EARLIEST : Topology.AutoOffsetReset.LATEST;
+ }
+
/**
* NOTE this class would not needed by developers working with the processor APIs, but only used
* for internal functionalities.
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public static class TopicsInfo {
public Set<String> sinkTopics;
public Set<String> sourceTopics;
@@ -108,7 +115,7 @@ public class TopologyBuilder {
}
/**
- * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}
+ * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}.
*/
public enum AutoOffsetReset {
EARLIEST, LATEST
@@ -119,8 +126,7 @@ public class TopologyBuilder {
*/
public TopologyBuilder() {}
- /** @deprecated This class is not part of public API and should never be used by a developer. */
- @Deprecated
+ /** This class is not part of public API and should never be used by a developer. */
public synchronized final TopologyBuilder setApplicationId(final String applicationId) {
internalTopologyBuilder.setApplicationId(applicationId);
return this;
@@ -140,7 +146,11 @@ public class TopologyBuilder {
*/
public synchronized final TopologyBuilder addSource(final String name,
final String... topics) {
- internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+ try {
+ internalTopologyBuilder.addSource(null, name, null, null, null, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -160,7 +170,11 @@ public class TopologyBuilder {
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,
final String... topics) {
- internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topics);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -178,8 +192,13 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
- final String name, final String... topics) {
- internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+ final String name,
+ final String... topics) {
+ try {
+ internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -199,8 +218,14 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
*/
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
- final TimestampExtractor timestampExtractor, final String name, final String... topics) {
- internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topics);
+ final TimestampExtractor timestampExtractor,
+ final String name,
+ final String... topics) {
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -219,7 +244,11 @@ public class TopologyBuilder {
*/
public synchronized final TopologyBuilder addSource(final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(null, name, null, null, null, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -240,7 +269,11 @@ public class TopologyBuilder {
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name, null, null, null, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, null, null, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -262,11 +295,14 @@ public class TopologyBuilder {
public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor,
final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(null, name, timestampExtractor, null, null, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
-
/**
* Add a new source that consumes from topics matching the given pattern
* and forward the records to child processor and/or sink nodes.
@@ -287,11 +323,14 @@ public class TopologyBuilder {
final TimestampExtractor timestampExtractor,
final String name,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, null, null, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
-
/**
* Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
* The source will use the specified key and value deserializers.
@@ -307,12 +346,15 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
*/
-
public synchronized final TopologyBuilder addSource(final String name,
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final String... topics) {
- internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+ try {
+ internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -334,14 +376,17 @@ public class TopologyBuilder {
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
*/
-
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final String... topics) {
- internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topics);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topics);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -374,7 +419,11 @@ public class TopologyBuilder {
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
- internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+ try {
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -409,7 +458,11 @@ public class TopologyBuilder {
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier) {
- internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+ try {
+ internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -435,7 +488,11 @@ public class TopologyBuilder {
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -466,7 +523,11 @@ public class TopologyBuilder {
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, timestampExtractor, keyDeserializer, valDeserializer, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -494,11 +555,14 @@ public class TopologyBuilder {
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final Pattern topicPattern) {
- internalTopologyBuilder.addSource(offsetReset, name, null, keyDeserializer, valDeserializer, topicPattern);
+ try {
+ internalTopologyBuilder.addSource(translateAutoOffsetReset(offsetReset), name, null, keyDeserializer, valDeserializer, topicPattern);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
-
/**
* Add a new sink that forwards records from predecessor nodes (processors and/or sources) to the named Kafka topic.
* The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
@@ -517,7 +581,11 @@ public class TopologyBuilder {
public synchronized final TopologyBuilder addSink(final String name,
final String topic,
final String... predecessorNames) {
- internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
+ try {
+ internalTopologyBuilder.addSink(name, topic, null, null, null, predecessorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -548,7 +616,11 @@ public class TopologyBuilder {
final String topic,
final StreamPartitioner partitioner,
final String... predecessorNames) {
- internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
+ try {
+ internalTopologyBuilder.addSink(name, topic, null, null, partitioner, predecessorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -576,7 +648,11 @@ public class TopologyBuilder {
final Serializer keySerializer,
final Serializer valSerializer,
final String... predecessorNames) {
- internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
+ try {
+ internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, null, predecessorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -607,7 +683,11 @@ public class TopologyBuilder {
final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ? super V> partitioner,
final String... predecessorNames) {
- internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
+ try {
+ internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, predecessorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -624,7 +704,11 @@ public class TopologyBuilder {
public synchronized final TopologyBuilder addProcessor(final String name,
final ProcessorSupplier supplier,
final String... predecessorNames) {
- internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
+ try {
+ internalTopologyBuilder.addProcessor(name, supplier, predecessorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -637,7 +721,11 @@ public class TopologyBuilder {
*/
public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier,
final String... processorNames) {
- internalTopologyBuilder.addStateStore(supplier, processorNames);
+ try {
+ internalTopologyBuilder.addStateStore(supplier, processorNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
return this;
}
@@ -650,7 +738,13 @@ public class TopologyBuilder {
*/
public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName,
final String... stateStoreNames) {
- internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
+ if (stateStoreNames != null && stateStoreNames.length > 0) {
+ try {
+ internalTopologyBuilder.connectProcessorAndStateStores(processorName, stateStoreNames);
+ } catch (final TopologyException e) {
+ throw new TopologyBuilderException(e);
+ }
+ }
return this;
}
@@ -660,10 +754,7 @@ public class TopologyBuilder {
*
* NOTE this function would not needed by developers working with the processor APIs, but only used
* for the high-level DSL parsing functionalities.
- *
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName,
final String topic) {
internalTopologyBuilder.connectSourceStoreAndTopic(sourceStoreName, topic);
@@ -679,9 +770,7 @@ public class TopologyBuilder {
* @param processorNames the name of the processors
* @return this builder instance so methods can be chained together; never null
* @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized final TopologyBuilder connectProcessors(final String... processorNames) {
internalTopologyBuilder.connectProcessors(processorNames);
return this;
@@ -695,9 +784,7 @@ public class TopologyBuilder {
*
* @param topicName the name of the topic
* @return this builder instance so methods can be chained together; never null
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized final TopologyBuilder addInternalTopic(final String topicName) {
internalTopologyBuilder.addInternalTopic(topicName);
return this;
@@ -711,9 +798,7 @@ public class TopologyBuilder {
*
* @param sourceNodes a set of source node names
* @return this builder instance so methods can be chained together; never null
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized final TopologyBuilder copartitionSources(final Collection<String> sourceNodes) {
internalTopologyBuilder.copartitionSources(sourceNodes);
return this;
@@ -726,9 +811,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return groups of node names
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Map<Integer, Set<String>> nodeGroups() {
return internalTopologyBuilder.nodeGroups();
}
@@ -741,9 +824,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized ProcessorTopology build(final Integer topicGroupId) {
return internalTopologyBuilder.build(topicGroupId);
}
@@ -755,9 +836,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return ProcessorTopology
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized ProcessorTopology buildGlobalStateTopology() {
return internalTopologyBuilder.buildGlobalStateTopology();
}
@@ -770,9 +849,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return map containing all global {@link StateStore}s
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public Map<String, StateStore> globalStateStores() {
return internalTopologyBuilder.globalStateStores();
}
@@ -785,11 +862,22 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return groups of topic names
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Map<Integer, TopicsInfo> topicGroups() {
- return internalTopologyBuilder.topicGroups();
+ final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroupsWithNewTopicsInfo = internalTopologyBuilder.topicGroups();
+ final Map<Integer, TopicsInfo> topicGroupsWithDeprecatedTopicInfo = new HashMap<>();
+
+ for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroupsWithNewTopicsInfo.entrySet()) {
+ final InternalTopologyBuilder.TopicsInfo newTopicsInfo = entry.getValue();
+
+ topicGroupsWithDeprecatedTopicInfo.put(entry.getKey(), new TopicsInfo(
+ newTopicsInfo.sinkTopics,
+ newTopicsInfo.sourceTopics,
+ newTopicsInfo.repartitionSourceTopics,
+ newTopicsInfo.stateChangelogTopics));
+ }
+
+ return topicGroupsWithDeprecatedTopicInfo;
}
/**
@@ -799,9 +887,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return the Pattern for matching all topics reading from earliest offset, never null
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Pattern earliestResetTopicsPattern() {
return internalTopologyBuilder.earliestResetTopicsPattern();
}
@@ -813,9 +899,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return the Pattern for matching all topics reading from latest offset, never null
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Pattern latestResetTopicsPattern() {
return internalTopologyBuilder.latestResetTopicsPattern();
}
@@ -825,9 +909,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return a mapping from state store name to a Set of source Topics.
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public Map<String, List<String>> stateStoreNameToSourceTopics() {
return internalTopologyBuilder.stateStoreNameToSourceTopics();
}
@@ -840,9 +922,7 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*
* @return groups of topic names
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Collection<Set<String>> copartitionGroups() {
return internalTopologyBuilder.copartitionGroups();
}
@@ -850,10 +930,7 @@ public class TopologyBuilder {
/**
* NOTE this function would not needed by developers working with the processor APIs, but only used
* for the high-level DSL parsing functionalities.
- *
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public SubscriptionUpdates subscriptionUpdates() {
return internalTopologyBuilder.subscriptionUpdates();
}
@@ -861,10 +938,7 @@ public class TopologyBuilder {
/**
* NOTE this function would not needed by developers working with the processor APIs, but only used
* for the high-level DSL parsing functionalities.
- *
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized Pattern sourceTopicPattern() {
return internalTopologyBuilder.sourceTopicPattern();
}
@@ -872,10 +946,7 @@ public class TopologyBuilder {
/**
* NOTE this function would not needed by developers working with the processor APIs, but only used
* for the high-level DSL parsing functionalities.
- *
- * @deprecated not part of public API and for internal usage only
*/
- @Deprecated
public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
final String threadId) {
internalTopologyBuilder.updateSubscriptions(subscriptionUpdates, threadId);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 4c1d350..7925b14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -38,9 +37,6 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
super(new TaskId(-1, -1), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), config, metrics, stateMgr, cache);
}
- /**
- * @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
- */
@Override
public StateStore getStateStore(final String name) {
return stateManager.getGlobalStore(name);
@@ -95,6 +91,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
/**
* @throws UnsupportedOperationException on every invocation
*/
+ @SuppressWarnings("deprecation")
@Override
public void schedule(long interval) {
throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index ff65d31..0d5cd48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -19,13 +19,13 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
@@ -207,7 +207,7 @@ public class InternalTopologyBuilder {
} else if (topicToPatterns.containsKey(update) && isMatch(update)) {
// the same topic cannot be matched to more than one pattern
// TODO: we should lift this requirement in the future
- throw new TopologyBuilderException("Topic " + update +
+ throw new TopologyException("Topic " + update +
" is already matched for another regex pattern " + topicToPatterns.get(update) +
" and hence cannot be matched to this regex pattern " + pattern + " any more.");
} else if (isMatch(update)) {
@@ -293,18 +293,18 @@ public class InternalTopologyBuilder {
return this;
}
- public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+ public final void addSource(final Topology.AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
final Deserializer valDeserializer,
final String... topics) {
if (topics.length == 0) {
- throw new TopologyBuilderException("You must provide at least one topic");
+ throw new TopologyException("You must provide at least one topic");
}
Objects.requireNonNull(name, "name must not be null");
if (nodeFactories.containsKey(name)) {
- throw new TopologyBuilderException("Processor " + name + " is already added.");
+ throw new TopologyException("Processor " + name + " is already added.");
}
for (final String topic : topics) {
@@ -319,67 +319,7 @@ public class InternalTopologyBuilder {
nodeGrouper.add(name);
}
- public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
- final String sourceName,
- final TimestampExtractor timestampExtractor,
- final Deserializer keyDeserializer,
- final Deserializer valueDeserializer,
- final String topic,
- final String processorName,
- final ProcessorSupplier stateUpdateSupplier) {
- Objects.requireNonNull(storeSupplier, "store supplier must not be null");
- Objects.requireNonNull(sourceName, "sourceName must not be null");
- Objects.requireNonNull(topic, "topic must not be null");
- Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
- Objects.requireNonNull(processorName, "processorName must not be null");
- if (nodeFactories.containsKey(sourceName)) {
- throw new TopologyBuilderException("Processor " + sourceName + " is already added.");
- }
- if (nodeFactories.containsKey(processorName)) {
- throw new TopologyBuilderException("Processor " + processorName + " is already added.");
- }
- if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
- throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added.");
- }
- if (storeSupplier.loggingEnabled()) {
- throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
- }
- if (sourceName.equals(processorName)) {
- throw new TopologyBuilderException("sourceName and processorName must be different.");
- }
-
- validateTopicNotAlreadyRegistered(topic);
-
- globalTopics.add(topic);
- final String[] topics = {topic};
- nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
- nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
- nodeGrouper.add(sourceName);
-
- final String[] predecessors = {sourceName};
- final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
- nodeFactory.addStateStore(storeSupplier.name());
- nodeFactories.put(processorName, nodeFactory);
- nodeGrouper.add(processorName);
- nodeGrouper.unite(processorName, predecessors);
-
- globalStateStores.put(storeSupplier.name(), storeSupplier.get());
- connectSourceStoreAndTopic(storeSupplier.name(), topic);
- }
-
- private void validateTopicNotAlreadyRegistered(final String topic) {
- if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
- throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
- }
-
- for (final Pattern pattern : nodeToSourcePatterns.values()) {
- if (pattern.matcher(topic).matches()) {
- throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
- }
- }
- }
-
- public final void addSource(final TopologyBuilder.AutoOffsetReset offsetReset,
+ public final void addSource(final Topology.AutoOffsetReset offsetReset,
final String name,
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
@@ -389,12 +329,12 @@ public class InternalTopologyBuilder {
Objects.requireNonNull(name, "name can't be null");
if (nodeFactories.containsKey(name)) {
- throw new TopologyBuilderException("Processor " + name + " is already added.");
+ throw new TopologyException("Processor " + name + " is already added.");
}
for (final String sourceTopicName : sourceTopicNames) {
if (topicPattern.matcher(sourceTopicName).matches()) {
- throw new TopologyBuilderException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
+ throw new TopologyException("Pattern " + topicPattern + " will match a topic that has already been registered by another source.");
}
}
@@ -414,15 +354,18 @@ public class InternalTopologyBuilder {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(topic, "topic must not be null");
if (nodeFactories.containsKey(name)) {
- throw new TopologyBuilderException("Processor " + name + " is already added.");
+ throw new TopologyException("Processor " + name + " is already added.");
}
for (final String predecessor : predecessorNames) {
if (predecessor.equals(name)) {
- throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+ throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
}
if (!nodeFactories.containsKey(predecessor)) {
- throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+ throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
+ }
+ if (nodeToSinkTopic.containsKey(predecessor)) {
+ throw new TopologyException("Sink " + predecessor + " cannot be used a parent.");
}
}
@@ -438,15 +381,15 @@ public class InternalTopologyBuilder {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(supplier, "supplier must not be null");
if (nodeFactories.containsKey(name)) {
- throw new TopologyBuilderException("Processor " + name + " is already added.");
+ throw new TopologyException("Processor " + name + " is already added.");
}
for (final String predecessor : predecessorNames) {
if (predecessor.equals(name)) {
- throw new TopologyBuilderException("Processor " + name + " cannot be a predecessor of itself.");
+ throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
}
if (!nodeFactories.containsKey(predecessor)) {
- throw new TopologyBuilderException("Predecessor processor " + predecessor + " is not added yet.");
+ throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
}
}
@@ -459,7 +402,7 @@ public class InternalTopologyBuilder {
final String... processorNames) {
Objects.requireNonNull(supplier, "supplier can't be null");
if (stateFactories.containsKey(supplier.name())) {
- throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
+ throw new TopologyException("StateStore " + supplier.name() + " is already added.");
}
stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
@@ -471,43 +414,109 @@ public class InternalTopologyBuilder {
}
}
+ public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+ final String sourceName,
+ final TimestampExtractor timestampExtractor,
+ final Deserializer keyDeserializer,
+ final Deserializer valueDeserializer,
+ final String topic,
+ final String processorName,
+ final ProcessorSupplier stateUpdateSupplier) {
+ Objects.requireNonNull(storeSupplier, "store supplier must not be null");
+ Objects.requireNonNull(sourceName, "sourceName must not be null");
+ Objects.requireNonNull(topic, "topic must not be null");
+ Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
+ Objects.requireNonNull(processorName, "processorName must not be null");
+ if (nodeFactories.containsKey(sourceName)) {
+ throw new TopologyException("Processor " + sourceName + " is already added.");
+ }
+ if (nodeFactories.containsKey(processorName)) {
+ throw new TopologyException("Processor " + processorName + " is already added.");
+ }
+ if (stateFactories.containsKey(storeSupplier.name()) || globalStateStores.containsKey(storeSupplier.name())) {
+ throw new TopologyException("StateStore " + storeSupplier.name() + " is already added.");
+ }
+ if (storeSupplier.loggingEnabled()) {
+ throw new TopologyException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
+ }
+ if (sourceName.equals(processorName)) {
+ throw new TopologyException("sourceName and processorName must be different.");
+ }
+
+ validateTopicNotAlreadyRegistered(topic);
+
+ globalTopics.add(topic);
+ final String[] topics = {topic};
+ nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
+ nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
+ nodeGrouper.add(sourceName);
+
+ final String[] predecessors = {sourceName};
+ final ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, predecessors, stateUpdateSupplier);
+ nodeFactory.addStateStore(storeSupplier.name());
+ nodeFactories.put(processorName, nodeFactory);
+ nodeGrouper.add(processorName);
+ nodeGrouper.unite(processorName, predecessors);
+
+ globalStateStores.put(storeSupplier.name(), storeSupplier.get());
+ connectSourceStoreAndTopic(storeSupplier.name(), topic);
+ }
+
+ private void validateTopicNotAlreadyRegistered(final String topic) {
+ if (sourceTopicNames.contains(topic) || globalTopics.contains(topic)) {
+ throw new TopologyException("Topic " + topic + " has already been registered by another source.");
+ }
+
+ for (final Pattern pattern : nodeToSourcePatterns.values()) {
+ if (pattern.matcher(topic).matches()) {
+ throw new TopologyException("Topic " + topic + " matches a Pattern already registered by another source.");
+ }
+ }
+ }
+
public final void connectProcessorAndStateStores(final String processorName,
final String... stateStoreNames) {
Objects.requireNonNull(processorName, "processorName can't be null");
- if (stateStoreNames != null) {
- for (final String stateStoreName : stateStoreNames) {
- connectProcessorAndStateStore(processorName, stateStoreName);
- }
+ Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be null");
+ if (stateStoreNames.length == 0) {
+ throw new TopologyException("Must provide at least one state store name.");
+ }
+ for (final String stateStoreName : stateStoreNames) {
+ connectProcessorAndStateStore(processorName, stateStoreName);
}
}
+ // TODO: this method is only used by DSL and we might want to refactor this part
public final void connectSourceStoreAndTopic(final String sourceStoreName,
final String topic) {
if (storeToChangelogTopic.containsKey(sourceStoreName)) {
- throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
+ throw new TopologyException("Source store " + sourceStoreName + " is already added.");
}
storeToChangelogTopic.put(sourceStoreName, topic);
}
+ // TODO: this method is only used by DSL and we might want to refactor this part
public final void connectProcessors(final String... processorNames) {
if (processorNames.length < 2) {
- throw new TopologyBuilderException("At least two processors need to participate in the connection.");
+ throw new TopologyException("At least two processors need to participate in the connection.");
}
for (final String processorName : processorNames) {
if (!nodeFactories.containsKey(processorName)) {
- throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+ throw new TopologyException("Processor " + processorName + " is not added yet.");
}
}
nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length));
}
+ // TODO: this method is only used by DSL and we might want to refactor this part
public final void addInternalTopic(final String topicName) {
Objects.requireNonNull(topicName, "topicName can't be null");
internalTopicNames.add(topicName);
}
+ // TODO: this method is only used by DSL and we might want to refactor this part
public final void copartitionSources(final Collection<String> sourceNodes) {
copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
}
@@ -515,10 +524,10 @@ public class InternalTopologyBuilder {
private void connectProcessorAndStateStore(final String processorName,
final String stateStoreName) {
if (!stateFactories.containsKey(stateStoreName)) {
- throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
+ throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
}
if (!nodeFactories.containsKey(processorName)) {
- throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
+ throw new TopologyException("Processor " + processorName + " is not added yet.");
}
final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
@@ -535,7 +544,7 @@ public class InternalTopologyBuilder {
processorNodeFactory.addStateStore(stateStoreName);
connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
} else {
- throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
+ throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
}
}
@@ -588,7 +597,7 @@ public class InternalTopologyBuilder {
private <T> void maybeAddToResetList(final Collection<T> earliestResets,
final Collection<T> latestResets,
- final TopologyBuilder.AutoOffsetReset offsetReset,
+ final Topology.AutoOffsetReset offsetReset,
final T item) {
if (offsetReset != null) {
switch (offsetReset) {
@@ -599,7 +608,7 @@ public class InternalTopologyBuilder {
latestResets.add(item);
break;
default:
- throw new TopologyBuilderException(String.format("Unrecognized reset format %s", offsetReset));
+ throw new TopologyException(String.format("Unrecognized reset format %s", offsetReset));
}
}
}
@@ -759,7 +768,7 @@ public class InternalTopologyBuilder {
}
}
} else {
- throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
+ throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
}
}
}
@@ -782,8 +791,8 @@ public class InternalTopologyBuilder {
*
* @return groups of topic names
*/
- public synchronized Map<Integer, TopologyBuilder.TopicsInfo> topicGroups() {
- final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = new LinkedHashMap<>();
+ public synchronized Map<Integer, TopicsInfo> topicGroups() {
+ final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
if (nodeGroups == null) {
nodeGroups = makeNodeGroups();
@@ -839,7 +848,7 @@ public class InternalTopologyBuilder {
}
}
if (!sourceTopics.isEmpty()) {
- topicGroups.put(entry.getKey(), new TopologyBuilder.TopicsInfo(
+ topicGroups.put(entry.getKey(), new TopicsInfo(
Collections.unmodifiableSet(sinkTopics),
Collections.unmodifiableSet(sourceTopics),
Collections.unmodifiableMap(internalSourceTopics),
@@ -921,7 +930,7 @@ public class InternalTopologyBuilder {
final Set<String> otherTopics) {
for (final Pattern otherPattern : otherPatterns) {
if (builtPattern.pattern().contains(otherPattern.pattern())) {
- throw new TopologyBuilderException(
+ throw new TopologyException(
String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets",
otherPattern.pattern(),
builtPattern.pattern()));
@@ -930,7 +939,7 @@ public class InternalTopologyBuilder {
for (final String otherTopic : otherTopics) {
if (builtPattern.matcher(otherTopic).matches()) {
- throw new TopologyBuilderException(
+ throw new TopologyException(
String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets",
builtPattern.pattern(),
otherTopic));
@@ -995,7 +1004,7 @@ public class InternalTopologyBuilder {
private String decorateTopic(final String topic) {
if (applicationId == null) {
- throw new TopologyBuilderException("there are internal topics and "
+ throw new TopologyException("there are internal topics and "
+ "applicationId hasn't been set. Call "
+ "setApplicationId first");
}
@@ -1397,6 +1406,49 @@ public class InternalTopologyBuilder {
}
}
+ public static class TopicsInfo {
+ public Set<String> sinkTopics;
+ public Set<String> sourceTopics;
+ public Map<String, InternalTopicConfig> stateChangelogTopics;
+ public Map<String, InternalTopicConfig> repartitionSourceTopics;
+
+ TopicsInfo(final Set<String> sinkTopics,
+ final Set<String> sourceTopics,
+ final Map<String, InternalTopicConfig> repartitionSourceTopics,
+ final Map<String, InternalTopicConfig> stateChangelogTopics) {
+ this.sinkTopics = sinkTopics;
+ this.sourceTopics = sourceTopics;
+ this.stateChangelogTopics = stateChangelogTopics;
+ this.repartitionSourceTopics = repartitionSourceTopics;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (o instanceof TopicsInfo) {
+ final TopicsInfo other = (TopicsInfo) o;
+ return other.sourceTopics.equals(sourceTopics) && other.stateChangelogTopics.equals(stateChangelogTopics);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final long n = ((long) sourceTopics.hashCode() << 32) | (long) stateChangelogTopics.hashCode();
+ return (int) (n % 0xFFFFFFFFL);
+ }
+
+ @Override
+ public String toString() {
+ return "TopicsInfo{" +
+ "sinkTopics=" + sinkTopics +
+ ", sourceTopics=" + sourceTopics +
+ ", repartitionSourceTopics=" + repartitionSourceTopics +
+ ", stateChangelogTopics=" + stateChangelogTopics +
+ '}';
+ }
+ }
+
public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription {
private final Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies = new HashSet<>();
private final Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 79c38b0..eb2a171 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -57,6 +57,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
/**
* @throws TopologyBuilderException if an attempt is made to access this state store from an unknown node
*/
+ @SuppressWarnings("deprecation")
@Override
public StateStore getStateStore(final String name) {
if (currentNode() == null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
index e26d110..1d9e722 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
@@ -35,6 +35,7 @@ class SourceNodeRecordDeserializer implements RecordDeserializer {
this.deserializationExceptionHandler = deserializationExceptionHandler;
}
+ @SuppressWarnings("deprecation")
@Override
public ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord) {
final Object key;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1844bf2b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index e8b6a1a..f9ae216 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
@@ -331,10 +330,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// parse the topology to determine the repartition source topics,
// making sure they are created with the number of partitions as
// the maximum of the depending sub-topologies source topics' number of partitions
- Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
+ Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = streamThread.builder.topicGroups();
Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
- for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+ for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
}
@@ -344,13 +343,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
do {
numPartitionsNeeded = false;
- for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+ for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
// try set the number of partitions for this repartition topic if it is not set yet
if (numPartitions == UNKNOWN) {
- for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
+ for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
if (otherSinkTopics.contains(topicName)) {
@@ -418,7 +417,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// get the tasks as partition groups from the partition grouper
Set<String> allSourceTopics = new HashSet<>();
Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
allSourceTopics.addAll(entry.getValue().sourceTopics);
sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
}
@@ -462,7 +461,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// add tasks to state change log topic subscribers
Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
- for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
final int topicGroupId = entry.getKey();
final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
@@ -646,6 +645,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
*
* @param topicPartitions Map that contains the topic names to be created with the number of partitions
*/
+ @SuppressWarnings("deprecation")
private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) {
log.debug("{} Starting to validate internal topics in partition assignor.", logPrefix);
@@ -775,6 +775,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
this.logPrefix = String.format("stream-thread [%s]", threadName);
}
+ @SuppressWarnings("deprecation")
void validate(final Set<String> copartitionGroup,
final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
final Cluster metadata) {