You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:33:04 UTC

[GitHub] [kafka] feyman2016 opened a new pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

feyman2016 opened a new pull request #8832:
URL: https://github.com/apache/kafka/pull/8832


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 edited a comment on pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 edited a comment on pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#issuecomment-641392176


   @abbccdda Thanks for the comments, I updated the PR with few comments and added algo description. Since the algo is still under discussion, I can add the unit tests later once we have the conclusion. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 edited a comment on pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 edited a comment on pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#issuecomment-647876499


   ```
   [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.12/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:84:1: Class Fan-Out Complexity is 52 (max allowed is 50). [ClassFanOutComplexity]
   ```
   
   Wired the ClassFanOutComplexity violation was not captured during my local run of checkstyle:
   ```
   ./gradlew checkstyleMain checkstyleTest spotbugsMain spotbugsTest spotbugsScoverage compileTestJava
   ```
   Should I suppress the `ClassFanOutComplexity` check for `StreamsPartitionAssignor`?  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#issuecomment-647876499


   ```
   [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.12/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:84:1: Class Fan-Out Complexity is 52 (max allowed is 50). [ClassFanOutComplexity]
   ```
   
   Wired the ClassFanOutComplexity was not captured during my local run of checkstyle:
   ```
   ./gradlew checkstyleMain checkstyleTest spotbugsMain spotbugsTest spotbugsScoverage compileTestJava
   ```
   Should I suppress the `ClassFanOutComplexity` check for `StreamsPartitionAssignor`?  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r437529829



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful

Review comment:
       Fixed

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful

Review comment:
       Updated

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {

Review comment:
       fixed

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful
+     * when in certain cases traverse is needed. For example, method setRepartitionTopicMetadataNumberOfPartitions

Review comment:
       Updated

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {

Review comment:
       cool, updated

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {

Review comment:
       updated

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {

Review comment:
       fixed also

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics

Review comment:
       I think it should be memoization here -> https://en.wikipedia.org/wiki/Memoization

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful
+     * when in certain cases traverse is needed. For example, method setRepartitionTopicMetadataNumberOfPartitions
+     * internally do a DFS search along with the graph.
+     *
+     TopicNode("t1")      TopicNode("t2")                                    TopicNode("t6")             TopicNode("t7")
+                \           /                                                            \                           /
+                  TopicsInfo(source = (t1,t2), sink = (t3,t4))                           TopicsInfo(source = (t6,t7), sink = (t4))
+                                /           \                                                                        /
+                             /                 \                                                          /
+                          /                        \                                           /
+                      /                                \                           /
+                 /                                       \            /
+     TopicNode("t3")                                     TopicNode("t4")
+            \
+     TopicsInfo(source = (t3), sink = ())
+
+     t3 = max(t1,t2)
+     t4 = max(max(t1,t2), max(t6,t7))
+     */
+    private static class TopicNode {
+        public final String topicName;
+        public final Set<TopicsInfo> upStreams; // upStream TopicsInfo's sinkTopics contains this
+        public final Set<TopicsInfo> downStreams; // downStreams TopicsInfo's sourceTopics contains this

Review comment:
       It's used to build the graph together with `TopicsInfo`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> topicsInfoNumberOfPartitions) {
+
+        if (repartitionTopicMetadata.containsKey(topic)) {
+            final Optional<Integer> maybeNumberPartitions = repartitionTopicMetadata.get(topic).numberOfPartitions();
+            // if numberOfPartitions already calculated, return directly
+            if (maybeNumberPartitions.isPresent()) {
+                return maybeNumberPartitions.get();
+            } else {
+                // else calculate the max numRepartitions of its upStream TopicsInfo and set the repartitionTopicMetadata for memoization before return
+                final TopicNode topicNode = builtTopicNodes.get(topic);
+                Integer maxNumberPartitions = 0;
+                for (final TopicsInfo upstream : topicNode.upStreams) {
+                    final Integer upStreamRepartitionNum = calcRepartitionNumForTopicInfo(upstream, repartitionTopicMetadata,
+                            metadata, builtTopicNodes, topicsInfoNumberOfPartitions);
+                    maxNumberPartitions = upStreamRepartitionNum > maxNumberPartitions ? upStreamRepartitionNum : maxNumberPartitions;
+                }
+                repartitionTopicMetadata.get(topic).setNumberOfPartitions(maxNumberPartitions);
+                return maxNumberPartitions;
+            }
+        } else {
+            final Integer count = metadata.partitionCountForTopic(topic);
+            if (count == null) {
+                throw new IllegalStateException(
+                        "No partition count found for source topic "
+                                + topic
+                                + ", but it should have been."
+                );
+            }
+            return count;
+        }
+    }
+
+    private int calcRepartitionNumForTopicInfo(final TopicsInfo topicsInfo,

Review comment:
       Yeah, seems to be the case, but in the mutual recursive function, it seems acceptable...

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> topicsInfoNumberOfPartitions) {

Review comment:
       Good catch! Seems problematic, I will check it later soon




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r445001673



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -511,68 +576,86 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
     /**
      * Computes the number of partitions and sets it for each repartition topic in repartitionTopicMetadata
      */
-    private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+    // visible for testing
+    public void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,

Review comment:
       package level access should be fine.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,71 @@ public String toString() {
         }
     }
 
+    /**
+     * A TopicNode is a node that contains topic information and upstream/downstream TopicsInfo. Graph built of TopicNode and TopicsInfoNode is useful
+     * when in certain cases traverse is needed. For example, {@link #setRepartitionTopicMetadataNumberOfPartitions(Map, Map, Cluster)}
+     * internally do a DFS search along with the graph.
+     *
+     TopicNode("t1")      TopicNode("t2")                                    TopicNode("t6")             TopicNode("t7")
+                \           /                                                            \                           /
+           TopicsInfoNode(source = (t1,t2), sink = (t3,t4))                           TopicsInfoNode(source = (t6,t7), sink = (t4))
+                                /           \                                                                        /
+                             /                 \                                                          /
+                          /                        \                                           /
+                      /                                \                           /
+                 /                                       \            /
+     TopicNode("t3")                                     TopicNode("t4")
+            \
+     TopicsInfoNode(source = (t3), sink = ())
+
+     t3 = max(t1,t2)
+     t4 = max(max(t1,t2), max(t6,t7))
+     */
+    private static class TopicNode {
+        public final String topicName;
+        public final Set<TopicsInfoNode> upStreams; // upStream TopicsInfo's sinkTopics contains this
+        public final Set<TopicsInfoNode> downStreams; // downStreams TopicsInfo's sourceTopics contains this
+        public Optional<Integer> numOfRepartitions;
+        TopicNode(final String topicName) {
+            this.topicName = topicName;
+            this.upStreams = new HashSet<>();
+            this.downStreams = new HashSet<>();
+            this.numOfRepartitions = Optional.empty();
+        }
+
+        public void addUpStreamTopicsInfo(final TopicsInfo topicsInfo) {
+            this.upStreams.add(new TopicsInfoNode(topicsInfo));
+        }
+
+        public void addDownStreamTopicsInfo(final TopicsInfo topicsInfo) {
+            this.downStreams.add(new TopicsInfoNode(topicsInfo));
+        }
+    }
+
+    // Node wrapper for TopicsInfo, which can be used together with TopicNode to build a graph to calculate
+    // numOfRepartitions, and numOfRepartitions of underlying TopicsInfo can be cached for memoization.
+    private static class TopicsInfoNode {
+        public TopicsInfo topicsInfo;
+        private Optional<Integer> numOfRepartitions;
+        TopicsInfoNode(final TopicsInfo topicsInfo) {
+            this.topicsInfo = topicsInfo;
+            this.numOfRepartitions = Optional.empty();
+        }
+
+        public void setNumOfRepartitions(final int numOfRepartitions) {

Review comment:
       nit: we could require non-negative value for `numOfRepartitions`

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1695,6 +1698,60 @@ public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() {
         shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
     }
 
+    @Test
+    public void shouldCorrectlySetRepartitionTopicMetadataNumberOfPartitions() {
+        // Test out a topology built with 3 levels of sub-topology as below:
+        //         input-t1   input-t2
+        //           |           |
+        //        topicInfo0 topicInfo4
+        //        /        \  /
+        //       t1         t2
+        //        |          |
+        // topicInfo1     topicInfo2
+        //        |           |
+        //      t1_1         t2_1
+        //         \         /
+        //         topicsInfo3
+        // t1, t2... are topics are the sourceTopics/sinkTopics of these sub-topologies, numberOfPartitions of a given
+        // topic should be the maximum of its upstream topics' numberOfPartitions.
+        // For example(use NoP instead of numberOfPartitions for simplicity):

Review comment:
       s/ `NoP instead of numberOfPartitions for simplicity` / `NoP = numberOfPartitions`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,71 @@ public String toString() {
         }
     }
 
+    /**
+     * A TopicNode is a node that contains topic information and upstream/downstream TopicsInfo. Graph built of TopicNode and TopicsInfoNode is useful
+     * when in certain cases traverse is needed. For example, {@link #setRepartitionTopicMetadataNumberOfPartitions(Map, Map, Cluster)}
+     * internally do a DFS search along with the graph.
+     *
+     TopicNode("t1")      TopicNode("t2")                                    TopicNode("t6")             TopicNode("t7")
+                \           /                                                            \                           /
+           TopicsInfoNode(source = (t1,t2), sink = (t3,t4))                           TopicsInfoNode(source = (t6,t7), sink = (t4))
+                                /           \                                                                        /
+                             /                 \                                                          /
+                          /                        \                                           /
+                      /                                \                           /
+                 /                                       \            /
+     TopicNode("t3")                                     TopicNode("t4")
+            \
+     TopicsInfoNode(source = (t3), sink = ())
+
+     t3 = max(t1,t2)
+     t4 = max(max(t1,t2), max(t6,t7))
+     */
+    private static class TopicNode {
+        public final String topicName;
+        public final Set<TopicsInfoNode> upStreams; // upStream TopicsInfo's sinkTopics contains this
+        public final Set<TopicsInfoNode> downStreams; // downStreams TopicsInfo's sourceTopics contains this
+        public Optional<Integer> numOfRepartitions;
+        TopicNode(final String topicName) {
+            this.topicName = topicName;
+            this.upStreams = new HashSet<>();
+            this.downStreams = new HashSet<>();
+            this.numOfRepartitions = Optional.empty();
+        }
+
+        public void addUpStreamTopicsInfo(final TopicsInfo topicsInfo) {
+            this.upStreams.add(new TopicsInfoNode(topicsInfo));
+        }
+
+        public void addDownStreamTopicsInfo(final TopicsInfo topicsInfo) {
+            this.downStreams.add(new TopicsInfoNode(topicsInfo));
+        }
+    }
+
+    // Node wrapper for TopicsInfo, which can be used together with TopicNode to build a graph to calculate
+    // numOfRepartitions, and numOfRepartitions of underlying TopicsInfo can be cached for memoization.

Review comment:
       `...build a graph to calculate partition number of repartition topic, and numOfRepartitions of underlying TopicsInfo is used for memoization.`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,71 @@ public String toString() {
         }
     }
 
+    /**
+     * A TopicNode is a node that contains topic information and upstream/downstream TopicsInfo. Graph built of TopicNode and TopicsInfoNode is useful

Review comment:
       Like we discussed offline, the downstream is actually not necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#issuecomment-650535027


   @abbccdda I merged the trunk and found that `StreamsPartitionAssignor` has been updated by other PRs so it's `ClassFanOutComplexity` goes up to 49, and it will definitely exceed the upper limit 50 with my change, so I just introduced an inner class `NumOfRepartitionsCalculator` to encapsulate the new algo to workaround the `ClassFanOutComplexity` limitation.
   Now we can come back to the most important issue I mentioned above, do we have the assumption that we won't have cycles in a graph built with `TopicsNode` and `TopicsInfoNode`? I will still try to find the answer for that but would be good if someone knows more details about it


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r446507613



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,71 @@ public String toString() {
         }
     }
 
+    /**
+     * A TopicNode is a node that contains topic information and upstream/downstream TopicsInfo. Graph built of TopicNode and TopicsInfoNode is useful
+     * when in certain cases traverse is needed. For example, {@link #setRepartitionTopicMetadataNumberOfPartitions(Map, Map, Cluster)}
+     * internally do a DFS search along with the graph.
+     *
+     TopicNode("t1")      TopicNode("t2")                                    TopicNode("t6")             TopicNode("t7")
+                \           /                                                            \                           /
+           TopicsInfoNode(source = (t1,t2), sink = (t3,t4))                           TopicsInfoNode(source = (t6,t7), sink = (t4))
+                                /           \                                                                        /
+                             /                 \                                                          /
+                          /                        \                                           /
+                      /                                \                           /
+                 /                                       \            /
+     TopicNode("t3")                                     TopicNode("t4")
+            \
+     TopicsInfoNode(source = (t3), sink = ())
+
+     t3 = max(t1,t2)
+     t4 = max(max(t1,t2), max(t6,t7))
+     */
+    private static class TopicNode {
+        public final String topicName;
+        public final Set<TopicsInfoNode> upStreams; // upStream TopicsInfo's sinkTopics contains this
+        public final Set<TopicsInfoNode> downStreams; // downStreams TopicsInfo's sourceTopics contains this
+        public Optional<Integer> numOfRepartitions;
+        TopicNode(final String topicName) {
+            this.topicName = topicName;
+            this.upStreams = new HashSet<>();
+            this.downStreams = new HashSet<>();
+            this.numOfRepartitions = Optional.empty();
+        }
+
+        public void addUpStreamTopicsInfo(final TopicsInfo topicsInfo) {
+            this.upStreams.add(new TopicsInfoNode(topicsInfo));
+        }
+
+        public void addDownStreamTopicsInfo(final TopicsInfo topicsInfo) {
+            this.downStreams.add(new TopicsInfoNode(topicsInfo));
+        }
+    }
+
+    // Node wrapper for TopicsInfo, which can be used together with TopicNode to build a graph to calculate
+    // numOfRepartitions, and numOfRepartitions of underlying TopicsInfo can be cached for memoization.

Review comment:
       fixed

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,71 @@ public String toString() {
         }
     }
 
+    /**
+     * A TopicNode is a node that contains topic information and upstream/downstream TopicsInfo. Graph built of TopicNode and TopicsInfoNode is useful
+     * when in certain cases traverse is needed. For example, {@link #setRepartitionTopicMetadataNumberOfPartitions(Map, Map, Cluster)}
+     * internally do a DFS search along with the graph.
+     *
+     TopicNode("t1")      TopicNode("t2")                                    TopicNode("t6")             TopicNode("t7")
+                \           /                                                            \                           /
+           TopicsInfoNode(source = (t1,t2), sink = (t3,t4))                           TopicsInfoNode(source = (t6,t7), sink = (t4))
+                                /           \                                                                        /
+                             /                 \                                                          /
+                          /                        \                                           /
+                      /                                \                           /
+                 /                                       \            /
+     TopicNode("t3")                                     TopicNode("t4")
+            \
+     TopicsInfoNode(source = (t3), sink = ())
+
+     t3 = max(t1,t2)
+     t4 = max(max(t1,t2), max(t6,t7))
+     */
+    private static class TopicNode {
+        public final String topicName;
+        public final Set<TopicsInfoNode> upStreams; // upStream TopicsInfo's sinkTopics contains this
+        public final Set<TopicsInfoNode> downStreams; // downStreams TopicsInfo's sourceTopics contains this
+        public Optional<Integer> numOfRepartitions;
+        TopicNode(final String topicName) {
+            this.topicName = topicName;
+            this.upStreams = new HashSet<>();
+            this.downStreams = new HashSet<>();
+            this.numOfRepartitions = Optional.empty();
+        }
+
+        public void addUpStreamTopicsInfo(final TopicsInfo topicsInfo) {
+            this.upStreams.add(new TopicsInfoNode(topicsInfo));
+        }
+
+        public void addDownStreamTopicsInfo(final TopicsInfo topicsInfo) {
+            this.downStreams.add(new TopicsInfoNode(topicsInfo));
+        }
+    }
+
+    // Node wrapper for TopicsInfo, which can be used together with TopicNode to build a graph to calculate
+    // numOfRepartitions, and numOfRepartitions of underlying TopicsInfo can be cached for memoization.
+    private static class TopicsInfoNode {
+        public TopicsInfo topicsInfo;
+        private Optional<Integer> numOfRepartitions;
+        TopicsInfoNode(final TopicsInfo topicsInfo) {
+            this.topicsInfo = topicsInfo;
+            this.numOfRepartitions = Optional.empty();
+        }
+
+        public void setNumOfRepartitions(final int numOfRepartitions) {

Review comment:
       fixed

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -511,68 +576,86 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
     /**
      * Computes the number of partitions and sets it for each repartition topic in repartitionTopicMetadata
      */
-    private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+    // visible for testing
+    public void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,

Review comment:
       sure

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1695,6 +1698,60 @@ public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() {
         shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
     }
 
+    @Test
+    public void shouldCorrectlySetRepartitionTopicMetadataNumberOfPartitions() {
+        // Test out a topology built with 3 levels of sub-topology as below:
+        //         input-t1   input-t2
+        //           |           |
+        //        topicInfo0 topicInfo4
+        //        /        \  /
+        //       t1         t2
+        //        |          |
+        // topicInfo1     topicInfo2
+        //        |           |
+        //      t1_1         t2_1
+        //         \         /
+        //         topicsInfo3
+        // t1, t2... are topics are the sourceTopics/sinkTopics of these sub-topologies, numberOfPartitions of a given
+        // topic should be the maximum of its upstream topics' numberOfPartitions.
+        // For example(use NoP instead of numberOfPartitions for simplicity):

Review comment:
       updated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r446507665



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,71 @@ public String toString() {
         }
     }
 
+    /**
+     * A TopicNode is a node that contains topic information and upstream/downstream TopicsInfo. Graph built of TopicNode and TopicsInfoNode is useful

Review comment:
       yeah, absolutely, updated :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r439565299



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -514,65 +554,81 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
                                                                final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1. Build a graph containing the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            allRepartitionSourceTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                builtTopicNodes.computeIfAbsent(sourceTopic, topic -> new TopicNode(topic));
+                builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                builtTopicNodes.computeIfAbsent(sinkTopic, topic -> new TopicNode(topic));
+                builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+            }
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> topicsInfoNumberOfPartitions) {
+
+        if (repartitionTopicMetadata.containsKey(topic)) {
+            final Optional<Integer> maybeNumberPartitions = repartitionTopicMetadata.get(topic).numberOfPartitions();
+            // if numberOfPartitions already calculated, return directly
+            if (maybeNumberPartitions.isPresent()) {
+                return maybeNumberPartitions.get();
+            } else {
+                // else calculate the max numRepartitions of its upStream TopicsInfo and set the repartitionTopicMetadata for memoization before return
+                final TopicNode topicNode = builtTopicNodes.get(topic);
+                Integer maxNumberPartitions = 0;

Review comment:
       We could just use int.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful
+     * when in certain cases traverse is needed. For example, method setRepartitionTopicMetadataNumberOfPartitions
+     * internally do a DFS search along with the graph.
+     *
+     TopicNode("t1")      TopicNode("t2")                                    TopicNode("t6")             TopicNode("t7")
+                \           /                                                            \                           /
+                  TopicsInfo(source = (t1,t2), sink = (t3,t4))                           TopicsInfo(source = (t6,t7), sink = (t4))
+                                /           \                                                                        /
+                             /                 \                                                          /
+                          /                        \                                           /
+                      /                                \                           /
+                 /                                       \            /
+     TopicNode("t3")                                     TopicNode("t4")
+            \
+     TopicsInfo(source = (t3), sink = ())
+
+     t3 = max(t1,t2)
+     t4 = max(max(t1,t2), max(t6,t7))
+     */
+    private static class TopicNode {
+        public final String topicName;
+        public final Set<TopicsInfo> upStreams; // upStream TopicsInfo's sinkTopics contains this
+        public final Set<TopicsInfo> downStreams; // downStreams TopicsInfo's sourceTopics contains this

Review comment:
       Are you sure? I don't see this struct being used for read anywhere.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r443097860



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -514,65 +554,81 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
                                                                final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1. Build a graph containing the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            allRepartitionSourceTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                builtTopicNodes.computeIfAbsent(sourceTopic, topic -> new TopicNode(topic));
+                builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                builtTopicNodes.computeIfAbsent(sinkTopic, topic -> new TopicNode(topic));
+                builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+            }
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r438274945



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -514,65 +554,81 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
                                                                final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1. Build a graph containing the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            allRepartitionSourceTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                builtTopicNodes.computeIfAbsent(sourceTopic, topic -> new TopicNode(topic));
+                builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                builtTopicNodes.computeIfAbsent(sinkTopic, topic -> new TopicNode(topic));
+                builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+            }
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics

Review comment:
       calc -> calculate




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#issuecomment-647809159


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#issuecomment-646943356


   @abbccdda Updated to fix the TopicsInfo non-keyable issue.
   However, this impl will not pass test: `KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable` and will stackoverflow because of infinite recursive call. Checked it a little bit, it is because there are cycle in the built graph:
   ```
   TopicsInfoA -> Internal topic 22 -> TopicsInfo B -> Internal topic 30 -> TopicsInfo A
   ```
   in the current impl, it won't fail because it will find the `numOfRepartitions` for a topics unless none of its upstream topics' `numOfRepartitions` can be found.
   I'm not sure if we have the assumption that there will be no cycle in the graph built with TopicsInfo and TopicsNode, if yes, I think the test `shouldInnerJoinMultiPartitionQueryable` should be fixed, if not, I will update the algo to consider the cycle handling.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r443097670



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -514,65 +554,81 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
                                                                final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1. Build a graph containing the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            allRepartitionSourceTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                builtTopicNodes.computeIfAbsent(sourceTopic, topic -> new TopicNode(topic));
+                builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                builtTopicNodes.computeIfAbsent(sinkTopic, topic -> new TopicNode(topic));
+                builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+            }
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> topicsInfoNumberOfPartitions) {
+
+        if (repartitionTopicMetadata.containsKey(topic)) {
+            final Optional<Integer> maybeNumberPartitions = repartitionTopicMetadata.get(topic).numberOfPartitions();
+            // if numberOfPartitions already calculated, return directly
+            if (maybeNumberPartitions.isPresent()) {
+                return maybeNumberPartitions.get();
+            } else {
+                // else calculate the max numRepartitions of its upStream TopicsInfo and set the repartitionTopicMetadata for memoization before return
+                final TopicNode topicNode = builtTopicNodes.get(topic);
+                Integer maxNumberPartitions = 0;

Review comment:
       Updated this a little bit, use Integer initial `null` to represent uncalculated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#issuecomment-641392176


   @abbccdda Thanks for the comments, I update the PR with few comments and added algo description. Since the algo is still under discussion, I can add the unit tests later once we have the conclusion. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#issuecomment-647905475


   @feyman2016 Do you think we could have a fix for that? https://checkstyle.sourceforge.io/apidocs/com/puppycrawl/tools/checkstyle/checks/metrics/ClassFanOutComplexityCheck.html


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#issuecomment-652231116


   @abbccdda Thanks! I also think it makes more sense to have no cycles with the Stream topology~


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r437532739



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> topicsInfoNumberOfPartitions) {
+
+        if (repartitionTopicMetadata.containsKey(topic)) {
+            final Optional<Integer> maybeNumberPartitions = repartitionTopicMetadata.get(topic).numberOfPartitions();
+            // if numberOfPartitions already calculated, return directly
+            if (maybeNumberPartitions.isPresent()) {
+                return maybeNumberPartitions.get();
+            } else {
+                // else calculate the max numRepartitions of its upStream TopicsInfo and set the repartitionTopicMetadata for memoization before return
+                final TopicNode topicNode = builtTopicNodes.get(topic);
+                Integer maxNumberPartitions = 0;
+                for (final TopicsInfo upstream : topicNode.upStreams) {
+                    final Integer upStreamRepartitionNum = calcRepartitionNumForTopicInfo(upstream, repartitionTopicMetadata,
+                            metadata, builtTopicNodes, topicsInfoNumberOfPartitions);
+                    maxNumberPartitions = upStreamRepartitionNum > maxNumberPartitions ? upStreamRepartitionNum : maxNumberPartitions;
+                }
+                repartitionTopicMetadata.get(topic).setNumberOfPartitions(maxNumberPartitions);
+                return maxNumberPartitions;
+            }
+        } else {
+            final Integer count = metadata.partitionCountForTopic(topic);
+            if (count == null) {
+                throw new IllegalStateException(
+                        "No partition count found for source topic "
+                                + topic
+                                + ", but it should have been."
+                );
+            }
+            return count;
+        }
+    }
+
+    private int calcRepartitionNumForTopicInfo(final TopicsInfo topicsInfo,

Review comment:
       Yeah, seems to be the case, but in the mutual recursive function, it seems acceptable...
   Anyway, I can rename them later if think the algo is finalized




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r436913810



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful
+     * when in certain cases traverse is needed. For example, method setRepartitionTopicMetadataNumberOfPartitions
+     * internally do a DFS search along with the graph.
+     *
+     TopicNode("t1")      TopicNode("t2")                                    TopicNode("t6")             TopicNode("t7")
+                \           /                                                            \                           /
+                  TopicsInfo(source = (t1,t2), sink = (t3,t4))                           TopicsInfo(source = (t6,t7), sink = (t4))
+                                /           \                                                                        /
+                             /                 \                                                          /
+                          /                        \                                           /
+                      /                                \                           /
+                 /                                       \            /
+     TopicNode("t3")                                     TopicNode("t4")
+            \
+     TopicsInfo(source = (t3), sink = ())
+
+     t3 = max(t1,t2)
+     t4 = max(max(t1,t2), max(t6,t7))
+     */
+    private static class TopicNode {
+        public final String topicName;
+        public final Set<TopicsInfo> upStreams; // upStream TopicsInfo's sinkTopics contains this
+        public final Set<TopicsInfo> downStreams; // downStreams TopicsInfo's sourceTopics contains this

Review comment:
       I haven't seen this struct being used.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {

Review comment:
       Could replace with addAll:
   `allRepartitionSourceTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());`
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful

Review comment:
       `with TopicNode and TopicsInfo` looks weird to phrase like this when we are defining `TopicNode`, maybe something like `with topic information associated with each node`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> topicsInfoNumberOfPartitions) {

Review comment:
       Do we need `topicsInfoNumberOfPartitions` as parameter? It is reset to new map for every call.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> topicsInfoNumberOfPartitions) {
+
+        if (repartitionTopicMetadata.containsKey(topic)) {
+            final Optional<Integer> maybeNumberPartitions = repartitionTopicMetadata.get(topic).numberOfPartitions();
+            // if numberOfPartitions already calculated, return directly
+            if (maybeNumberPartitions.isPresent()) {
+                return maybeNumberPartitions.get();
+            } else {
+                // else calculate the max numRepartitions of its upStream TopicsInfo and set the repartitionTopicMetadata for memoization before return
+                final TopicNode topicNode = builtTopicNodes.get(topic);
+                Integer maxNumberPartitions = 0;
+                for (final TopicsInfo upstream : topicNode.upStreams) {
+                    final Integer upStreamRepartitionNum = calcRepartitionNumForTopicInfo(upstream, repartitionTopicMetadata,
+                            metadata, builtTopicNodes, topicsInfoNumberOfPartitions);
+                    maxNumberPartitions = upStreamRepartitionNum > maxNumberPartitions ? upStreamRepartitionNum : maxNumberPartitions;
+                }
+                repartitionTopicMetadata.get(topic).setNumberOfPartitions(maxNumberPartitions);
+                return maxNumberPartitions;
+            }
+        } else {
+            final Integer count = metadata.partitionCountForTopic(topic);
+            if (count == null) {
+                throw new IllegalStateException(
+                        "No partition count found for source topic "
+                                + topic
+                                + ", but it should have been."
+                );
+            }
+            return count;
+        }
+    }
+
+    private int calcRepartitionNumForTopicInfo(final TopicsInfo topicsInfo,

Review comment:
       `calcRepartitionNumForTopic` and `calcRepartitionNumForTopicInfo` are pretty hard to differentiate, better to name it more distinct.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {

Review comment:
       Same here

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics

Review comment:
       memoization -> memorization ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful

Review comment:
       is a topic node

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -147,6 +147,46 @@ public String toString() {
         }
     }
 
+    /**
+     * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful
+     * when in certain cases traverse is needed. For example, method setRepartitionTopicMetadataNumberOfPartitions

Review comment:
       do a link: 
   `method setRepartitionTopicMetadataNumberOfPartitions` -> `{@link #setRepartitionTopicMetadataNumberOfPartitions}`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {

Review comment:
       nit: format

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {

Review comment:
       we could use `computeIfAbsent`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode

Review comment:
       `a graph containing`, and correct space between `1. Build`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
      */
     private void setRepartitionTopicMetadataNumberOfPartitions(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
                                                                final Map<Integer, TopicsInfo> topicGroups,
-                                                               final Cluster metadata) {
-        boolean numPartitionsNeeded;
-        do {
-            numPartitionsNeeded = false;
-
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
-                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
-                    final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName)
-                                                                     .numberOfPartitions();
-                    Integer numPartitions = null;
-
-                    if (!maybeNumPartitions.isPresent()) {
-                        // try set the number of partitions for this repartition topic if it is not set yet
-                        for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
-                            final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
-
-                            if (otherSinkTopics.contains(topicName)) {
-                                // if this topic is one of the sink topics of this topology,
-                                // use the maximum of all its source topic partitions as the number of partitions
-                                for (final String sourceTopicName : otherTopicsInfo.sourceTopics) {
-                                    Integer numPartitionsCandidate = null;
-                                    // It is possible the sourceTopic is another internal topic, i.e,
-                                    // map().join().join(map())
-                                    if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) {
-                                            numPartitionsCandidate =
-                                                repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-                                        }
-                                    } else {
-                                        final Integer count = metadata.partitionCountForTopic(sourceTopicName);
-                                        if (count == null) {
-                                            throw new IllegalStateException(
-                                                "No partition count found for source topic "
-                                                    + sourceTopicName
-                                                    + ", but it should have been."
-                                            );
-                                        }
-                                        numPartitionsCandidate = count;
-                                    }
-
-                                    if (numPartitionsCandidate != null) {
-                                        if (numPartitions == null || numPartitionsCandidate > numPartitions) {
-                                            numPartitions = numPartitionsCandidate;
-                                        }
-                                    }
-                                }
-                            }
-                        }
-
-                        // if we still have not found the right number of partitions,
-                        // another iteration is needed
-                        if (numPartitions == null) {
-                            numPartitionsNeeded = true;
-                        } else {
-                            repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-                        }
-                    }
+                                                                final Cluster metadata) {
+        final Set<String> allRepartitionSourceTopics = new HashSet<>();
+        final Map<String, TopicNode> builtTopicNodes = new HashMap<>();
+        // 1.  Build a graph contains the TopicsInfo and TopicsNode
+        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+                allRepartitionSourceTopics.add(topicName);
+            }
+            for (final String sourceTopic : topicsInfo.sourceTopics) {
+                if (builtTopicNodes.containsKey(sourceTopic)) {
+                    builtTopicNodes.get(sourceTopic).addDownStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sourceTopic);
+                    topicNode.addDownStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sourceTopic, topicNode);
+                }
+            }
+
+            for (final String sinkTopic : topicsInfo.sinkTopics) {
+                if (builtTopicNodes.containsKey(sinkTopic)) {
+                    builtTopicNodes.get(sinkTopic).addUpStreamTopicsInfo(topicsInfo);
+                } else {
+                    final TopicNode topicNode = new TopicNode(sinkTopic);
+                    topicNode.addUpStreamTopicsInfo(topicsInfo);
+                    builtTopicNodes.put(sinkTopic, topicNode);
                 }
             }
-        } while (numPartitionsNeeded);
+        }
+
+        // 2. Use DFS along with memoization to calc repartition number of all repartitionSourceTopics
+        for (final String topic : allRepartitionSourceTopics) {
+            calcRepartitionNumForTopic(topic, repartitionTopicMetadata, metadata, builtTopicNodes, new HashMap<TopicsInfo, Integer>());
+        }
+    }
+
+    private int calcRepartitionNumForTopic(final String topic,
+                                           final Map<String, InternalTopicConfig> repartitionTopicMetadata,
+                                           final Cluster metadata,
+                                           final Map<String, TopicNode> builtTopicNodes,
+                                           final Map<TopicsInfo, Integer> topicsInfoNumberOfPartitions) {

Review comment:
       Does `TopicsInfo` work for a map key? Looking at its override equals, it doesn't seem to check all fields for equality.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] feyman2016 commented on pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

Posted by GitBox <gi...@apache.org>.
feyman2016 commented on pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#issuecomment-645347696


   @abbccdda Sorry for having being silent for several days, I have some local update to fix the issue that `TopicsInfo` is not appropriate for map key, but I have been caught in too many flaky integration tests locally... so haven't push them till now, will update soon


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org