You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/11 01:23:59 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9727: [KAFKA-10417] Update Cogrouped processor to work with suppress() and joins

wcarlson5 commented on a change in pull request #9727:
URL: https://github.com/apache/kafka/pull/9727#discussion_r540619198



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -115,24 +124,28 @@
                                 final Merger<? super K, VOut> sessionMerger) {
         processRepartitions(groupPatterns, storeBuilder);
         final Collection<GraphNode> processors = new ArrayList<>();
+        final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
         boolean stateCreated = false;
         int counter = 0;
         for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor = (KStreamAggProcessorSupplier<K, K, ?, ?>) new KStreamSessionWindowAggregate<K, K, VOut>(sessionWindows, storeBuilder.name(), initializer, kGroupedStream.getValue(), sessionMerger);

Review comment:
       I think these lines are too long. The ones in the other methods too

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -201,9 +219,10 @@ private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<
             "-cogroup-merge",
             builder,
             CogroupedKStreamImpl.MERGE_NAME);
-        final ProcessorSupplier<K, VOut> passThrough = new PassThrough<>();

Review comment:
       I believe PassThrough is only used for the cogroup but now I think you can remove it completely.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org