You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:06:04 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

abbccdda commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r436913810



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful
+     * when in certain cases traverse is needed. For example, method setRepartitionTopicMetadataNumberOfPartitions
+     * internally do a DFS search along with the graph.
+     *
+     TopicNode("t1")      TopicNode("t2")                                    TopicNode("t6")             TopicNode("t7")
+                \           /                                                            \                           /
+                  TopicsInfo(source = (t1,t2), sink = (t3,t4))                           TopicsInfo(source = (t6,t7), sink = (t4))
+                                /           \                                                                        /
+                             /                 \                                                          /
+                          /                        \                                           /
+                      /                                \                           /
+                 /                                       \            /
+     TopicNode("t3")                                     TopicNode("t4")
+            \
+     TopicsInfo(source = (t3), sink = ())
+
+     t3 = max(t1,t2)
+     t4 = max(max(t1,t2), max(t6,t7))
+     */
+    private static class TopicNode {
+        public final String topicName;
+        public final Set<TopicsInfo> upStreams; // upStream TopicsInfo's sinkTopics contains this
+        public final Set<TopicsInfo> downStreams; // downStreams TopicsInfo's sourceTopics contains this

Review comment:
       I haven't seen this struct being used.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {

Review comment:
       Could replace with addAll:
   `allRepartitionSourceTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());`
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful

Review comment:
       `with TopicNode and TopicsInfo` looks weird to phrase like this when we are defining `TopicNode`, maybe something like `with topic information associated with each node`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> topicsInfoNumberOfPartitions) {

Review comment:
       Do we need `topicsInfoNumberOfPartitions` as parameter? It is reset to new map for every call.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> topicsInfoNumberOfPartitions) {
+
+        if (repartitionTopicMetadata.containsKey(topic)) {
+            final Optional<Integer> maybeNumberPartitions = repartitionTopicMetadata.get(topic).numberOfPartitions();
+            // if numberOfPartitions already calculated, return directly
+            if (maybeNumberPartitions.isPresent()) {
+                return maybeNumberPartitions.get();
+            } else {
+                // else calculate the max numRepartitions of its upStream TopicsInfo and set the repartitionTopicMetadata for memoization before return
+                final TopicNode topicNode = builtTopicNodes.get(topic);
+                Integer maxNumberPartitions = 0;
+                for (final TopicsInfo upstream : topicNode.upStreams) {
+                    final Integer upStreamRepartitionNum = calcRepartitionNumForTopicInfo(upstream, repartitionTopicMetadata,
+                            metadata, builtTopicNodes, topicsInfoNumberOfPartitions);
+                    maxNumberPartitions = upStreamRepartitionNum > maxNumberPartitions ? upStreamRepartitionNum : maxNumberPartitions;
+                }
+                repartitionTopicMetadata.get(topic).setNumberOfPartitions(maxNumberPartitions);
+                return maxNumberPartitions;
+            }
+        } else {
+            final Integer count = metadata.partitionCountForTopic(topic);
+            if (count == null) {
+                throw new IllegalStateException(
+                        "No partition count found for source topic "
+                                + topic
+                                + ", but it should have been."
+                );
+            }
+            return count;
+        }
+    }
+
+    private int calcRepartitionNumForTopicInfo(final TopicsInfo topicsInfo,

Review comment:
       `calcRepartitionNumForTopic` and `calcRepartitionNumForTopicInfo` are pretty hard to differentiate, better to name it more distinct.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {

Review comment:
       Same here

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics

Review comment:
       memoization -> memorization ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful

Review comment:
       is a topic node

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful
+     * when in certain cases traverse is needed. For example, method setRepartitionTopicMetadataNumberOfPartitions

Review comment:
       do a link: 
   `method setRepartitionTopicMetadataNumberOfPartitions` -> `{@link #setRepartitionTopicMetadataNumberOfPartitions}`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {

Review comment:
       nit: format

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {

Review comment:
       we could use `computeIfAbsent`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode

Review comment:
       `a graph containing`, and correct space between `1. Build`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> topicsInfoNumberOfPartitions) {

Review comment:
       Does `TopicsInfo` work for a map key? Looking at its override equals, it doesn't seem to check all fields for equality.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org