You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/23 12:32:03 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies

guozhangwang commented on a change in pull request #10683:
URL: https://github.com/apache/kafka/pull/10683#discussion_r657029488



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -799,41 +795,41 @@ public KafkaStreams(final Topology topology,
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
                         final Time time) {
-        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier(), time);
+        this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, new DefaultKafkaClientSupplier(), time);
     }
 
-    private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
-                         final StreamsConfig config,
-                         final KafkaClientSupplier clientSupplier) throws StreamsException {
-        this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
-    }
-
-    private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
+    private KafkaStreams(final Topology topology,
                          final StreamsConfig config,
                          final KafkaClientSupplier clientSupplier,
                          final Time time) throws StreamsException {
+        this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, clientSupplier, time);
+    }
+
+    protected KafkaStreams(final TopologyMetadata topologyMetadata,

Review comment:
       Do these two functions need to be `protected` rather than `private`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-    //TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683)
+    private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+    // the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology
+    private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
+
+    private final StreamsConfig config;
+    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability
+
+    private ProcessorTopology globalTopology;
+    private Map<String, StateStore> globalStateStores = new HashMap<>();
+    final Set<String> allInputTopics = new HashSet<>();
+
+    public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) {
+        this.config = config;
+        builders = new TreeMap<>();
+        if (builder.hasNamedTopology()) {
+            builders.put(builder.namedTopology(), builder);
+        } else {
+            builders.put(UNNAMED_TOPOLOGY, builder);
+        }
+    }
+
+    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> builders, final StreamsConfig config) {
+        this.config = config;
+        this.builders = builders;
+        if (builders.isEmpty()) {
+            log.debug("Building KafkaStreams app with no empty topology");
+        }
+    }
+
+    public int getNumStreamThreads(final StreamsConfig config) {
+        final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+        // If the application uses named topologies, it's possible to start up with no topologies at all and only add them later
+        if (builders.isEmpty()) {
+            if (configuredNumStreamThreads != 0) {

Review comment:
       Maybe we should just require `atLeast(1)` in StreamsConfig definition? And then here we only need to check the first condition.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-    //TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683)
+    private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+    // the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology
+    private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
+
+    private final StreamsConfig config;
+    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability
+
+    private ProcessorTopology globalTopology;
+    private Map<String, StateStore> globalStateStores = new HashMap<>();
+    final Set<String> allInputTopics = new HashSet<>();
+
+    public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) {
+        this.config = config;
+        builders = new TreeMap<>();
+        if (builder.hasNamedTopology()) {
+            builders.put(builder.namedTopology(), builder);
+        } else {
+            builders.put(UNNAMED_TOPOLOGY, builder);
+        }
+    }
+
+    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> builders, final StreamsConfig config) {
+        this.config = config;
+        this.builders = builders;
+        if (builders.isEmpty()) {
+            log.debug("Building KafkaStreams app with no empty topology");
+        }
+    }
+
+    public int getNumStreamThreads(final StreamsConfig config) {
+        final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+        // If the application uses named topologies, it's possible to start up with no topologies at all and only add them later
+        if (builders.isEmpty()) {
+            if (configuredNumStreamThreads != 0) {
+                log.info("Overriding number of StreamThreads to zero for empty topology");
+            }
+            return 0;
+        }
+
+        // If there are topologies but they are all empty, this indicates a bug in user code
+        if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {
+            log.error("Topology with no input topics will create no stream threads and no global thread.");
+            throw new TopologyException("Topology has no stream threads and no global threads, " +
+                                            "must subscribe to at least one source topic or global table.");
+        }
+
+        // Lastly we check for an empty non-global topology and override the threads to zero if set otherwise
+        if (configuredNumStreamThreads != 0 && hasNoNonGlobalTopology()) {
+            log.info("Overriding number of StreamThreads to zero for global-only topology");
+            return 0;
+        }
+
+        return configuredNumStreamThreads;
+    }
+
+    public boolean hasNamedTopologies() {
+        // This includes the case of starting up with no named topologies at all
+        return !builders.containsKey(UNNAMED_TOPOLOGY);
+    }
+
+    public boolean hasGlobalTopology() {
+        return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasGlobalStores);
+    }
+
+    public boolean hasNoNonGlobalTopology() {
+        return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasNoNonGlobalTopology);
+    }
+
+    public boolean hasPersistentStores() {
+        // If the app is using named topologies, there may not be any persistent state when it first starts up
+        // but a new NamedTopology may introduce it later, so we must return true
+        if (hasNamedTopologies()) {
+            return true;
+        }
+        return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasPersistentStores);
+    }
+
+    public boolean hasStore(final String name) {
+        return evaluateConditionIsTrueForAnyBuilders(b -> b.hasStore(name));
+    }
+
+    public boolean hasOffsetResetOverrides() {
+        return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasOffsetResetOverrides);
+    }
+
+    public OffsetResetStrategy offsetResetStrategy(final String topic) {
+        for (final InternalTopologyBuilder builder : builders.values()) {
+            final OffsetResetStrategy resetStrategy = builder.offsetResetStrategy(topic);
+            if (resetStrategy != null) {
+                return resetStrategy;
+            }
+        }
+        return null;
+    }
+
+    Collection<String> sourceTopicCollection() {
+        final List<String> sourceTopics = new ArrayList<>();
+        applyToEachBuilder(b -> sourceTopics.addAll(b.sourceTopicCollection()));
+        return sourceTopics;
+    }
+
+    Pattern sourceTopicPattern() {
+        final StringBuilder patternBuilder = new StringBuilder();
+
+        applyToEachBuilder(b -> {
+            final String patternString = b.sourceTopicsPatternString();
+            if (patternString.length() > 0) {
+                patternBuilder.append(patternString).append("|");
+            }
+        });
+
+        if (patternBuilder.length() > 0) {
+            patternBuilder.setLength(patternBuilder.length() - 1);
+            return Pattern.compile(patternBuilder.toString());
+        } else {
+            return EMPTY_ZERO_LENGTH_PATTERN;
+        }
+    }
+
+    public boolean usesPatternSubscription() {
+        return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::usesPatternSubscription);
+    }
+
+    // Can be empty if app is started up with no Named Topologies, in order to add them on later
+    public boolean isEmpty() {
+        return builders.isEmpty();
+    }
+
+    public String topologyDescription() {

Review comment:
       nit: topologyDescriptionString() ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1042,13 +1053,23 @@ private void buildProcessorNode(final Map<String, ProcessorNode<?, ?, ?, ?>> pro
 
                     // remember the changelog topic if this state store is change-logging enabled
                     if (stateStoreFactory.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
-                        final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, stateStoreName);
+                        final String changelogTopic =
+                            ProcessorStateManager.storeChangelogTopic(applicationId, stateStoreName, namedTopology);
                         storeToChangelogTopic.put(stateStoreName, changelogTopic);
                         changelogTopicToStore.put(changelogTopic, stateStoreName);
                     }
-                    stateStoreMap.put(stateStoreName, stateStoreFactory.build());
+                    final StateStore store = stateStoreFactory.build();
+                    stateStoreMap.put(stateStoreName, store);
+                    if (store.persistent()) {

Review comment:
       We can define a `store` outside and move this out of the if-else condition.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1065,14 +1086,22 @@ private void buildProcessorNode(final Map<String, ProcessorNode<?, ?, ?, ?>> pro
         return Collections.unmodifiableMap(globalStateStores);
     }
 
-    public Set<String> allStateStoreName() {
+    public Set<String> allStateStoreNames() {
         Objects.requireNonNull(applicationId, "topology has not completed optimization");
 
         final Set<String> allNames = new HashSet<>(stateFactories.keySet());
         allNames.addAll(globalStateStores.keySet());
         return Collections.unmodifiableSet(allNames);
     }
 
+    public boolean hasStore(final String name) {
+        return stateFactories.containsKey(name) || globalStateStores.containsKey(name);
+    }
+
+    public boolean hasPersistentStores() {

Review comment:
       Seems we are moving this check/flag earlier from the topology to the topology-builder, is there a motivation for it?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1296,6 +1275,14 @@ public synchronized void start() throws IllegalStateException, StreamsException
         } else {
             throw new IllegalStateException("The client is either already started or already stopped, cannot re-start");
         }
+
+        if (topologyMetadata.isEmpty()) {
+            if (setState(State.RUNNING)) {
+                log.debug("Transitioning directly to RUNNING for app with no named topologies");
+            } else {
+                throw new IllegalStateException("Unexpected error in transitioning empty KafkaStreams to RUNNING");

Review comment:
       nit: KafkaStreams with empty processing topology

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-    //TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683)
+    private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+    // the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology
+    private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
+
+    private final StreamsConfig config;
+    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability
+
+    private ProcessorTopology globalTopology;
+    private Map<String, StateStore> globalStateStores = new HashMap<>();
+    final Set<String> allInputTopics = new HashSet<>();
+
+    public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) {
+        this.config = config;
+        builders = new TreeMap<>();
+        if (builder.hasNamedTopology()) {
+            builders.put(builder.namedTopology(), builder);
+        } else {
+            builders.put(UNNAMED_TOPOLOGY, builder);
+        }
+    }
+
+    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> builders, final StreamsConfig config) {
+        this.config = config;
+        this.builders = builders;
+        if (builders.isEmpty()) {
+            log.debug("Building KafkaStreams app with no empty topology");
+        }
+    }
+
+    public int getNumStreamThreads(final StreamsConfig config) {
+        final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+        // If the application uses named topologies, it's possible to start up with no topologies at all and only add them later
+        if (builders.isEmpty()) {
+            if (configuredNumStreamThreads != 0) {
+                log.info("Overriding number of StreamThreads to zero for empty topology");
+            }
+            return 0;
+        }
+
+        // If there are topologies but they are all empty, this indicates a bug in user code
+        if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {
+            log.error("Topology with no input topics will create no stream threads and no global thread.");
+            throw new TopologyException("Topology has no stream threads and no global threads, " +
+                                            "must subscribe to at least one source topic or global table.");
+        }
+
+        // Lastly we check for an empty non-global topology and override the threads to zero if set otherwise
+        if (configuredNumStreamThreads != 0 && hasNoNonGlobalTopology()) {
+            log.info("Overriding number of StreamThreads to zero for global-only topology");
+            return 0;
+        }
+
+        return configuredNumStreamThreads;
+    }
+
+    public boolean hasNamedTopologies() {
+        // This includes the case of starting up with no named topologies at all
+        return !builders.containsKey(UNNAMED_TOPOLOGY);
+    }
+
+    public boolean hasGlobalTopology() {
+        return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasGlobalStores);
+    }
+
+    public boolean hasNoNonGlobalTopology() {
+        return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasNoNonGlobalTopology);
+    }
+
+    public boolean hasPersistentStores() {
+        // If the app is using named topologies, there may not be any persistent state when it first starts up
+        // but a new NamedTopology may introduce it later, so we must return true
+        if (hasNamedTopologies()) {
+            return true;
+        }
+        return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasPersistentStores);
+    }
+
+    public boolean hasStore(final String name) {
+        return evaluateConditionIsTrueForAnyBuilders(b -> b.hasStore(name));
+    }
+
+    public boolean hasOffsetResetOverrides() {
+        return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasOffsetResetOverrides);
+    }
+
+    public OffsetResetStrategy offsetResetStrategy(final String topic) {
+        for (final InternalTopologyBuilder builder : builders.values()) {
+            final OffsetResetStrategy resetStrategy = builder.offsetResetStrategy(topic);
+            if (resetStrategy != null) {
+                return resetStrategy;
+            }
+        }
+        return null;
+    }
+
+    Collection<String> sourceTopicCollection() {
+        final List<String> sourceTopics = new ArrayList<>();
+        applyToEachBuilder(b -> sourceTopics.addAll(b.sourceTopicCollection()));
+        return sourceTopics;
+    }
+
+    Pattern sourceTopicPattern() {
+        final StringBuilder patternBuilder = new StringBuilder();
+
+        applyToEachBuilder(b -> {
+            final String patternString = b.sourceTopicsPatternString();
+            if (patternString.length() > 0) {
+                patternBuilder.append(patternString).append("|");
+            }
+        });
+
+        if (patternBuilder.length() > 0) {
+            patternBuilder.setLength(patternBuilder.length() - 1);
+            return Pattern.compile(patternBuilder.toString());
+        } else {
+            return EMPTY_ZERO_LENGTH_PATTERN;
+        }
+    }
+
+    public boolean usesPatternSubscription() {
+        return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::usesPatternSubscription);
+    }
+
+    // Can be empty if app is started up with no Named Topologies, in order to add them on later
+    public boolean isEmpty() {
+        return builders.isEmpty();
+    }
+
+    public String topologyDescription() {
+        if (isEmpty()) {
+            return "";
+        }
+        final StringBuilder sb = new StringBuilder();
+
+        applyToEachBuilder(b -> {
+            sb.append(b.describe().toString());
+        });
+
+        return sb.toString();
+    }
+
+    public final void buildAndRewriteTopology() {
+        applyToEachBuilder(builder -> {
+            builder.rewriteTopology(config);
+            builder.buildTopology();
+
+            // As we go, check each topology for overlap in the set of input topics/patterns
+            final int numInputTopics = allInputTopics.size();
+            final List<String> inputTopics = builder.fullSourceTopicNames();
+            final Collection<String> inputPatterns = builder.allSourcePatternStrings();
+
+            final int numNewInputTopics = inputTopics.size() + inputPatterns.size();
+            allInputTopics.addAll(inputTopics);
+            allInputTopics.addAll(inputPatterns);
+            if (allInputTopics.size() != numInputTopics + numNewInputTopics) {
+                inputTopics.retainAll(allInputTopics);
+                inputPatterns.retainAll(allInputTopics);
+                inputTopics.addAll(inputPatterns);
+                log.error("Tried to add the NamedTopology {} but it had overlap with other input topics: {}", builder.namedTopology(), inputTopics);
+                throw new TopologyException("Named Topologies may not subscribe to the same input topics or patterns");
+            }
+
+            final ProcessorTopology globalTopology = builder.buildGlobalStateTopology();
+            if (globalTopology != null) {
+                if (builder.namedTopology() != null) {
+                    throw new IllegalStateException("Global state stores are not supported with Named Topologies");
+                } else if (this.globalTopology == null) {
+                    this.globalTopology = globalTopology;
+                } else {
+                    throw new IllegalStateException("Topology builder had global state, but global topology has already been set");
+                }
+            }
+            globalStateStores.putAll(builder.globalStateStores());
+        });
+    }
+
+    public ProcessorTopology buildSubtopology(final TaskId task) {
+        return lookupBuilderForTask(task).buildSubtopology(task.subtopology());
+    }
+
+    public ProcessorTopology globalTaskTopology() {
+        if (hasNamedTopologies()) {
+            throw new IllegalStateException("Global state stores are not supported with Named Topologies");
+        }
+        return globalTopology;
+    }
+
+    public Map<String, StateStore> globalStateStores() {
+        return globalStateStores;
+    }
+
+    public Map<String, List<String>> stateStoreNameToSourceTopics() {
+        final Map<String, List<String>> stateStoreNameToSourceTopics = new HashMap<>();
+        applyToEachBuilder(b -> stateStoreNameToSourceTopics.putAll(b.stateStoreNameToSourceTopics()));
+        return stateStoreNameToSourceTopics;
+    }
+
+    public String getStoreForChangelogTopic(final String topicName) {
+        for (final InternalTopologyBuilder builder : builders.values()) {
+            final String store = builder.getStoreForChangelogTopic(topicName);
+            if (store != null) {
+                return store;
+            }
+        }
+        log.warn("Unable to locate any store for topic {}", topicName);
+        return "";

Review comment:
       Why not return `null` here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -16,10 +16,294 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO KAFKA-12648:
+//  1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable
 
 public class TopologyMetadata {
-    //TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683)
+    private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class);
+
+    // the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology
+    private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
+
+    private final StreamsConfig config;
+    private final SortedMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability
+
+    private ProcessorTopology globalTopology;
+    private Map<String, StateStore> globalStateStores = new HashMap<>();
+    final Set<String> allInputTopics = new HashSet<>();
+
+    public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) {
+        this.config = config;
+        builders = new TreeMap<>();
+        if (builder.hasNamedTopology()) {
+            builders.put(builder.namedTopology(), builder);
+        } else {
+            builders.put(UNNAMED_TOPOLOGY, builder);
+        }
+    }
+
+    public TopologyMetadata(final SortedMap<String, InternalTopologyBuilder> builders, final StreamsConfig config) {
+        this.config = config;
+        this.builders = builders;
+        if (builders.isEmpty()) {
+            log.debug("Building KafkaStreams app with no empty topology");
+        }
+    }
+
+    public int getNumStreamThreads(final StreamsConfig config) {
+        final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+
+        // If the application uses named topologies, it's possible to start up with no topologies at all and only add them later
+        if (builders.isEmpty()) {
+            if (configuredNumStreamThreads != 0) {
+                log.info("Overriding number of StreamThreads to zero for empty topology");
+            }
+            return 0;
+        }
+
+        // If there are topologies but they are all empty, this indicates a bug in user code
+        if (hasNoNonGlobalTopology() && !hasGlobalTopology()) {

Review comment:
       Based on the logic of `evaluateConditionIsTrueForAnyBuilders`, if `builders.isEmpty` it should always return false right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -345,8 +343,17 @@ private SinkNodeFactory(final String name,
         }
     }
 
+    public void setTopologyName(final String namedTopology) {
+        Objects.requireNonNull(namedTopology, "named topology can't be null");
+        if (this.namedTopology != null) {
+            log.error("Tried to reset the namedTopology to {} but it was already set to {}", namedTopology, this.namedTopology);
+            throw new IllegalStateException("NamedTopology has already been set to " + this.namedTopology);
+        }
+        this.namedTopology = namedTopology;
+    }
+
     // public for testing only
-    public synchronized final InternalTopologyBuilder setApplicationId(final String applicationId) {

Review comment:
       Why remove synchronization here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org