You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/25 08:47:45 UTC

[GitHub] [kafka] zhaohaidao opened a new pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

zhaohaidao opened a new pull request #8550:
URL: https://github.com/apache/kafka/pull/8550


   Tickets: KAFKA-9850
   
   Move some repartition operator validation to topology.build()
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-620915590


   @abbccdda Hi, pr updated. Could you continue to review it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r418784189



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
##########
@@ -104,6 +114,54 @@ public void shouldInvokePartitionerWhenSet() {
         verify(streamPartitionerMock);
     }
 
+    @Test
+    public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationsDoNotMatchWhenJoining() {
+        final String topicB = "topic-b";
+        final String outputTopic = "topic-output";
+        final String topicBRepartitionedName = "topic-b-scale-up";
+        final String inputTopicRepartitionedName = "input-topic-scale-up";
+        final int topicBNumberOfPartitions = 2;
+        final int inputTopicNumberOfPartitions = 4;
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Repartitioned<Integer, String> inputTopicRepartitioned = Repartitioned
+                .<Integer, String>as(inputTopicRepartitionedName)
+                .withNumberOfPartitions(inputTopicNumberOfPartitions);
+
+        final Repartitioned<Integer, String> topicBRepartitioned = Repartitioned
+                .<Integer, String>as(topicBRepartitionedName)
+                .withNumberOfPartitions(topicBNumberOfPartitions);
+
+        final KStream<Integer, String> topicBStream = builder
+                .stream(topicB, Consumed.with(Serdes.Integer(), Serdes.String()))
+                .repartition(topicBRepartitioned);
+
+        builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()))
+                .repartition(inputTopicRepartitioned)
+                .join(topicBStream, (value1, value2) -> value2, JoinWindows.of(Duration.ofSeconds(10)))
+                .to(outputTopic);
+
+        final Map<String, Integer> repartitionTopicsWithNumOfPartitions = Utils.mkMap(
+                Utils.mkEntry(toRepartitionTopicName(topicBRepartitionedName), topicBNumberOfPartitions),
+                Utils.mkEntry(toRepartitionTopicName(inputTopicRepartitionedName), inputTopicNumberOfPartitions)
+        );
+
+        try {
+            builder.build(props);
+            Assert.fail();
+        } catch (final TopologyException t) {

Review comment:
       It's better to use:
   ```
   final TopologyException expected = assertThrows(
     TopologyException.class,
     () -> builder.build(props)
   );
   // put assertions here
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =

Review comment:
       If I understand this code correct, `copartitionGroups` take the list of co-partitioned nodes (ie, processor names -> `copartitionSourceGroups`) and replaces each processor name (each processor should be a source-processor?) with the corresponding source topic name?
   
   If yes, a comment might be helpful. Also, name rename `copartitionGroups` -> `allCopartitionedSourceTopics` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())

Review comment:
       rename `node` -> `sourceNodeName` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> copartition : copartitionGroups) {
+            final Map<String, Integer> topicPartNum = new HashMap<>();
+            copartition.forEach(topic -> {
+                final InternalTopicProperties prop = internalTopicNamesWithProperties.get(topic);
+                if (prop != null && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());
+                }
+            });
+            internalTopicNamesWithProperties.forEach((topic, prop) -> {
+                if (copartition.contains(topic) && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());
+                }
+            });
+            if (copartition.equals(topicPartNum.keySet())) {
+                final Collection<Integer> partNums = topicPartNum.values();
+                final Integer first = partNums.iterator().next();

Review comment:
       How can we be sure that `partNums` is not empty? If empty, `next()` would throw.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> copartition : copartitionGroups) {
+            final Map<String, Integer> topicPartNum = new HashMap<>();

Review comment:
       What is `topicPartNum`? Please avoid abbreviation; make the code hard to read?
   
   Should this be `numberOfPartitionsPerTopic` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -312,6 +312,7 @@ public void buildAndOptimizeTopology(final Properties props) {
                 graphNodePriorityQueue.offer(graphNode);
             }
         }
+        internalTopologyBuilder.validateCoPartition();

Review comment:
       I don't think so. Because the later verification that covers the case if the user specified all partitions, is the same code that verify co-partitioning depending on the input topic partition numbers. And we still need that check anyway.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> copartition : copartitionGroups) {
+            final Map<String, Integer> topicPartNum = new HashMap<>();
+            copartition.forEach(topic -> {
+                final InternalTopicProperties prop = internalTopicNamesWithProperties.get(topic);
+                if (prop != null && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());
+                }
+            });
+            internalTopicNamesWithProperties.forEach((topic, prop) -> {
+                if (copartition.contains(topic) && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());
+                }
+            });
+            if (copartition.equals(topicPartNum.keySet())) {
+                final Collection<Integer> partNums = topicPartNum.values();
+                final Integer first = partNums.iterator().next();
+                for (final Integer partNum : partNums) {
+                    if (partNum.equals(first)) {

Review comment:
       Should this be `!partNum.equals(first)` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> copartition : copartitionGroups) {
+            final Map<String, Integer> topicPartNum = new HashMap<>();
+            copartition.forEach(topic -> {
+                final InternalTopicProperties prop = internalTopicNamesWithProperties.get(topic);
+                if (prop != null && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());
+                }
+            });
+            internalTopicNamesWithProperties.forEach((topic, prop) -> {
+                if (copartition.contains(topic) && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());

Review comment:
       Why do we add all the partition numbers for all internal topics here? Seems redundant to the step from above?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r418816470



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> copartition : copartitionGroups) {
+            final Map<String, Integer> topicPartNum = new HashMap<>();
+            copartition.forEach(topic -> {
+                final InternalTopicProperties prop = internalTopicNamesWithProperties.get(topic);
+                if (prop != null && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());
+                }
+            });
+            internalTopicNamesWithProperties.forEach((topic, prop) -> {
+                if (copartition.contains(topic) && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());

Review comment:
       Sorry, this is the side effects of unreasonable rebase operation.
   I have removed this redundant part




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r415335411



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -312,6 +312,7 @@ public void buildAndOptimizeTopology(final Properties props) {
                 graphNodePriorityQueue.offer(graphNode);
             }
         }
+        internalTopologyBuilder.validateCoPartition();

Review comment:
       I'm not sure if we can remove later stage validation code
   
   @mjsax @lkokhreidze Can you give some advice?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r418820500



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> copartition : copartitionGroups) {
+            final Map<String, Integer> topicPartNum = new HashMap<>();
+            copartition.forEach(topic -> {
+                final InternalTopicProperties prop = internalTopicNamesWithProperties.get(topic);
+                if (prop != null && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());
+                }
+            });
+            internalTopicNamesWithProperties.forEach((topic, prop) -> {
+                if (copartition.contains(topic) && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());
+                }
+            });
+            if (copartition.equals(topicPartNum.keySet())) {
+                final Collection<Integer> partNums = topicPartNum.values();
+                final Integer first = partNums.iterator().next();
+                for (final Integer partNum : partNums) {
+                    if (partNum.equals(first)) {

Review comment:
       Done. Sorry for this low-level mistake.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-624766057


   @mjsax hi, failed tests have been fixed. Could you retest this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-622651723


   Thanks for the update. I'll wait for Jenkins and do another pass afterwards.
   
   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r415334757



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,45 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCoPartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> coPartition : copartitionGroups) {
+            final Map<String, InternalTopicProperties> coPartitionProperties = new HashMap<>();
+            internalTopicNamesWithProperties.forEach((topic, prop) -> {
+                if (coPartition.contains(topic) && prop.getNumberOfPartitions().isPresent()) {
+                    coPartitionProperties.put(topic, prop);
+                }
+            });
+            if (coPartition.size() == coPartitionProperties.size()) {

Review comment:
       It's my pleasure.
   It means that not all input topics have correspond internal topic if coPartition.size() != coPartitionProperties.size(), if not equal is true, we can just skip this validation. You can see the original validation in CopartitionedTopicsEnforcer#enforce
   ```
   if (copartitionGroup.equals(repartitionTopicConfigs.keySet())) {
       ...
       validateAndGetNumOfPartitions
       ...
   }
   ```
   If some of input topics don't have repartition operation, their internal topic partition number can be deducted by others which have repartition operation. You can see KStreamRepartitionIntegrationTest#shouldDeductNumberOfPartitionsFromRepartitionOperation for more details.
   
    So we can skip this validation if coPartition.size() != coPartitionProperties.size()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r418813975



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
##########
@@ -104,6 +114,54 @@ public void shouldInvokePartitionerWhenSet() {
         verify(streamPartitionerMock);
     }
 
+    @Test
+    public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationsDoNotMatchWhenJoining() {
+        final String topicB = "topic-b";
+        final String outputTopic = "topic-output";
+        final String topicBRepartitionedName = "topic-b-scale-up";
+        final String inputTopicRepartitionedName = "input-topic-scale-up";
+        final int topicBNumberOfPartitions = 2;
+        final int inputTopicNumberOfPartitions = 4;
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Repartitioned<Integer, String> inputTopicRepartitioned = Repartitioned
+                .<Integer, String>as(inputTopicRepartitionedName)
+                .withNumberOfPartitions(inputTopicNumberOfPartitions);
+
+        final Repartitioned<Integer, String> topicBRepartitioned = Repartitioned
+                .<Integer, String>as(topicBRepartitionedName)
+                .withNumberOfPartitions(topicBNumberOfPartitions);
+
+        final KStream<Integer, String> topicBStream = builder
+                .stream(topicB, Consumed.with(Serdes.Integer(), Serdes.String()))
+                .repartition(topicBRepartitioned);
+
+        builder.stream(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()))
+                .repartition(inputTopicRepartitioned)
+                .join(topicBStream, (value1, value2) -> value2, JoinWindows.of(Duration.ofSeconds(10)))
+                .to(outputTopic);
+
+        final Map<String, Integer> repartitionTopicsWithNumOfPartitions = Utils.mkMap(
+                Utils.mkEntry(toRepartitionTopicName(topicBRepartitionedName), topicBNumberOfPartitions),
+                Utils.mkEntry(toRepartitionTopicName(inputTopicRepartitionedName), inputTopicNumberOfPartitions)
+        );
+
+        try {
+            builder.build(props);
+            Assert.fail();
+        } catch (final TopologyException t) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #8550:
URL: https://github.com/apache/kafka/pull/8550


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r418815217



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r418817105



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> copartition : copartitionGroups) {
+            final Map<String, Integer> topicPartNum = new HashMap<>();
+            copartition.forEach(topic -> {
+                final InternalTopicProperties prop = internalTopicNamesWithProperties.get(topic);
+                if (prop != null && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());
+                }
+            });
+            internalTopicNamesWithProperties.forEach((topic, prop) -> {
+                if (copartition.contains(topic) && prop.getNumberOfPartitions().isPresent()) {
+                    topicPartNum.put(topic, prop.getNumberOfPartitions().get());
+                }
+            });
+            if (copartition.equals(topicPartNum.keySet())) {
+                final Collection<Integer> partNums = topicPartNum.values();
+                final Integer first = partNums.iterator().next();

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-622630031


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-625595081


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-625595157


   Will try to review in the next couple of day.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r415209254



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -312,6 +312,7 @@ public void buildAndOptimizeTopology(final Properties props) {
                 graphNodePriorityQueue.offer(graphNode);
             }
         }
+        internalTopologyBuilder.validateCoPartition();

Review comment:
       nit: validateCopartition

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,45 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCoPartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> coPartition : copartitionGroups) {
+            final Map<String, InternalTopicProperties> coPartitionProperties = new HashMap<>();

Review comment:
       Let's try to be consistent to use `copartition` instead of `coPartition`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,45 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCoPartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> coPartition : copartitionGroups) {
+            final Map<String, InternalTopicProperties> coPartitionProperties = new HashMap<>();

Review comment:
       Since we are only going to verify number of partitions, I think we could just set value as integer

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -312,6 +312,7 @@ public void buildAndOptimizeTopology(final Properties props) {
                 graphNodePriorityQueue.offer(graphNode);
             }
         }
+        internalTopologyBuilder.validateCoPartition();

Review comment:
       One question, since we do verification in topology builder, is there any validation code in later stage that could be removed?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,45 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCoPartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> coPartition : copartitionGroups) {
+            final Map<String, InternalTopicProperties> coPartitionProperties = new HashMap<>();
+            internalTopicNamesWithProperties.forEach((topic, prop) -> {
+                if (coPartition.contains(topic) && prop.getNumberOfPartitions().isPresent()) {
+                    coPartitionProperties.put(topic, prop);
+                }
+            });
+            if (coPartition.size() == coPartitionProperties.size()) {

Review comment:
       Could you clarify why we need this equality check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-626255004


   > Retest this please.
   
   @mjsax could you retest this? I tested cases in my laptop and the failed case passed


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-622651782


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r418815490



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =
+                copartitionSourceGroups
+                        .stream()
+                        .map(sourceGroup -> sourceGroup
+                                .stream()
+                                .flatMap(node -> nodeToSourceTopics.get(node).stream())
+                                .collect(Collectors.toSet())
+                        ).collect(Collectors.toList());
+        for (final Set<String> copartition : copartitionGroups) {
+            final Map<String, Integer> topicPartNum = new HashMap<>();

Review comment:
       Done and thanks for your advice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax edited a comment on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
mjsax edited a comment on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-622651723


   Thanks for the update. I'll wait for Jenkins and do another pass afterwards.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-628317830


   Thanks for the PR @zhaohaidao! Merged to `trunk`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-622651191


   > Retest this please.
   
   @mjsax I have modified the codes according to your suggestion and passed the test. Could you continue to review it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax edited a comment on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
mjsax edited a comment on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-622651723


   Thanks for the update. I'll wait for Jenkins and do another pass afterwards.
   
   Btw: the comment "Retest this please" is not for you, but for Jenkins to run the build :) -- you can ignore those comments. (Also note, only committers can trigger builds so it won't work for you).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#issuecomment-621320858


   @mjsax @vvcephei I think this change makes sense, could you two also take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

Posted by GitBox <gi...@apache.org>.
zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r418814943



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -633,6 +634,43 @@ public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
 
+    public void validateCopartition() {
+        final List<Set<String>> copartitionGroups =

Review comment:
       Makes total sense. thanks for your advice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org