You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/07 17:54:27 UTC

[GitHub] [kafka] lct45 opened a new pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

lct45 opened a new pull request #9141:
URL: https://github.com/apache/kafka/pull/9141


   Updated `CogroupedStreamAggregateBuilder` to have individual builders depending on the windowed aggregation, or lack thereof. This replaced passing in all options into the builder, with all but the current type of aggregation set to null and then checking to see which value was not null.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#discussion_r467236081



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
                                                        final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valueSerde,
-                                                       final String queryableName,
-                                                       final Windows<W> windows,
-                                                       final SessionWindows sessionWindows,
-                                                       final Merger<? super K, VOut> sessionMerger) {
+                                                       final String queryableName) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildTimeWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                              final Initializer<VOut> initializer,
+                                                              final NamedInternal named,
+                                                              final StoreBuilder<?> storeBuilder,
+                                                              final Serde<KR> keySerde,
+                                                              final Serde<VOut> valueSerde,
+                                                              final String queryableName,
+                                                                  final Windows<W> windows) {
+        build(groupPatterns, storeBuilder);
 
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildSessionWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                                  final Initializer<VOut> initializer,
+                                                                  final NamedInternal named,
+                                                                  final StoreBuilder<?> storeBuilder,
+                                                                  final Serde<KR> keySerde,
+                                                                  final Serde<VOut> valueSerde,
+                                                                  final String queryableName,
+                                                                  final SessionWindows sessionWindows,
+                                                                     final Merger<? super K, VOut> sessionMerger) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamSessionWindowAggregate<>(sessionWindows, storeBuilder.name(), initializer, kGroupedStream.getValue(), sessionMerger));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    private void build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                       final StoreBuilder<?> storeBuilder) {

Review comment:
       I went with processRepartitions and then updated all the other methods to just be `build` with different parameters since that seems cleaner




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#issuecomment-671622789


   Verified the last commit only changed whitespace before merging.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#discussion_r468206463



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -85,68 +160,38 @@
         groupedStreams.remove(kGrouped);
         kGrouped.ensureCopartitionWith(groupedStreams);
 
-        final Collection<StreamsGraphNode> processors = new ArrayList<>();
-        boolean stateCreated = false;
-        int counter = 0;
-        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
-                kGroupedStream.getValue(),
-                initializer,
-                named.suffixWithOrElseGet(
-                    "-cogroup-agg-" + counter++,
-                    builder,
-                    CogroupedKStreamImpl.AGGREGATE_NAME),
-                stateCreated,
-                storeBuilder,
-                windows,
-                sessionWindows,
-                sessionMerger);
-            stateCreated = true;
-            processors.add(statefulProcessorNode);
-            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
-        }
+    }
+
+    <KR, VIn> KTable<KR, VOut> createTable(final Collection<StreamsGraphNode> processors,
+                                           final NamedInternal named,
+                                           final Serde<KR> keySerde,
+                                           final Serde<VOut> valueSerde,
+                                           final String queryableName) {
         final String mergeProcessorName = named.suffixWithOrElseGet(
-            "-cogroup-merge",
-            builder,
-            CogroupedKStreamImpl.MERGE_NAME);
+                "-cogroup-merge",
+                builder,
+                CogroupedKStreamImpl.MERGE_NAME);

Review comment:
       It seems like your indentation is set to 8 spaces instead of 4.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#discussion_r467210400



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,

Review comment:
       WDYT about naming this just `build`? It's not as clear, but I think it's in line with the naming conventions elsewhere. For example we have `KStreamWindowAggregate` and `KStreamAggregate` (then maybe we can come up with a more descriptive name for the method currently called `build`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -85,68 +163,39 @@
         groupedStreams.remove(kGrouped);
         kGrouped.ensureCopartitionWith(groupedStreams);
 
-        final Collection<StreamsGraphNode> processors = new ArrayList<>();
-        boolean stateCreated = false;
-        int counter = 0;
-        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
-                kGroupedStream.getValue(),
-                initializer,
-                named.suffixWithOrElseGet(
-                    "-cogroup-agg-" + counter++,
-                    builder,
-                    CogroupedKStreamImpl.AGGREGATE_NAME),
-                stateCreated,
-                storeBuilder,
-                windows,
-                sessionWindows,
-                sessionMerger);
-            stateCreated = true;
-            processors.add(statefulProcessorNode);
-            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
-        }
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> createTable(final Collection<StreamsGraphNode> processors,
+                                                             final NamedInternal named,
+                                                             final Serde<KR> keySerde,
+                                                             final Serde<VOut> valueSerde,
+                                                             final String queryableName) {
         final String mergeProcessorName = named.suffixWithOrElseGet(
-            "-cogroup-merge",
-            builder,
-            CogroupedKStreamImpl.MERGE_NAME);
+                "-cogroup-merge",
+                builder,
+                CogroupedKStreamImpl.MERGE_NAME);
         final ProcessorSupplier<K, VOut> passThrough = new PassThrough<>();
         final ProcessorGraphNode<K, VOut> mergeNode =
-            new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
+                new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
 
         builder.addGraphNode(processors, mergeNode);
 
         return new KTableImpl<KR, VIn, VOut>(
-            mergeProcessorName,
-            keySerde,
-            valueSerde,
-            Collections.singleton(mergeNode.nodeName()),
-            queryableName,
-            passThrough,
-            mergeNode,
-            builder);
+                mergeProcessorName,
+                keySerde,
+                valueSerde,
+                Collections.singleton(mergeNode.nodeName()),
+                queryableName,
+                passThrough,
+                mergeNode,
+                builder);
     }
 
-    private <W extends Window> StatefulProcessorNode<K, ?> getStatefulProcessorNode(final Aggregator<? super K, Object, VOut> aggregator,
-                                                                                    final Initializer<VOut> initializer,
+    private <W extends Window> StatefulProcessorNode<K, ?> getStatefulProcessorNode(final Initializer<VOut> initializer,

Review comment:
       We can remove the `<W extends Window>` now, right? Also it looks like the `initializer` input isn't needed anymore either

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -85,68 +163,39 @@
         groupedStreams.remove(kGrouped);
         kGrouped.ensureCopartitionWith(groupedStreams);
 
-        final Collection<StreamsGraphNode> processors = new ArrayList<>();
-        boolean stateCreated = false;
-        int counter = 0;
-        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
-                kGroupedStream.getValue(),
-                initializer,
-                named.suffixWithOrElseGet(
-                    "-cogroup-agg-" + counter++,
-                    builder,
-                    CogroupedKStreamImpl.AGGREGATE_NAME),
-                stateCreated,
-                storeBuilder,
-                windows,
-                sessionWindows,
-                sessionMerger);
-            stateCreated = true;
-            processors.add(statefulProcessorNode);
-            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
-        }
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> createTable(final Collection<StreamsGraphNode> processors,

Review comment:
       Can remove the `W extends Window` generic

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
                                                        final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valueSerde,
-                                                       final String queryableName,
-                                                       final Windows<W> windows,
-                                                       final SessionWindows sessionWindows,
-                                                       final Merger<? super K, VOut> sessionMerger) {
+                                                       final String queryableName) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildTimeWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,

Review comment:
       Honestly I think it's fine to just name all three of these `build`, since they accept different parameters and it should be pretty clear from the context whether it's windowed or not. But being more descriptive is never a bad thing either. Your call 🙂 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,

Review comment:
       Also you can remove the `Vin` and `W extends Window` generics on this method

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
                                                        final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valueSerde,
-                                                       final String queryableName,
-                                                       final Windows<W> windows,
-                                                       final SessionWindows sessionWindows,
-                                                       final Merger<? super K, VOut> sessionMerger) {
+                                                       final String queryableName) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);

Review comment:
       This is all the same for all three methods except for the `KStreamAggregate`/`KStreamWindowAggregate`/etc right? I think if you wanted to further deduplicate things you could factor this  out into a method that accepts a `Function< KGroupedStreamImpl, KStreamAggProcessorSupplier`>, and then each of the `build` methods can just pass in a function that returns `new KStreamWindowAggregate` or so on.
   I'm not sure it's really worth it or not, but  it can be done in case you were wondering. Up to you whether you want to do it

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
                                                        final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valueSerde,
-                                                       final String queryableName,
-                                                       final Windows<W> windows,
-                                                       final SessionWindows sessionWindows,
-                                                       final Merger<? super K, VOut> sessionMerger) {
+                                                       final String queryableName) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildTimeWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,

Review comment:
       Don't need `Vin` here

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
                                                        final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valueSerde,
-                                                       final String queryableName,
-                                                       final Windows<W> windows,
-                                                       final SessionWindows sessionWindows,
-                                                       final Merger<? super K, VOut> sessionMerger) {
+                                                       final String queryableName) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildTimeWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                              final Initializer<VOut> initializer,
+                                                              final NamedInternal named,
+                                                              final StoreBuilder<?> storeBuilder,
+                                                              final Serde<KR> keySerde,
+                                                              final Serde<VOut> valueSerde,
+                                                              final String queryableName,
+                                                                  final Windows<W> windows) {
+        build(groupPatterns, storeBuilder);
 
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildSessionWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,

Review comment:
       Don't need `Vin` and `W extends Window` here

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
                                                        final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valueSerde,
-                                                       final String queryableName,
-                                                       final Windows<W> windows,
-                                                       final SessionWindows sessionWindows,
-                                                       final Merger<? super K, VOut> sessionMerger) {
+                                                       final String queryableName) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildTimeWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                              final Initializer<VOut> initializer,
+                                                              final NamedInternal named,
+                                                              final StoreBuilder<?> storeBuilder,
+                                                              final Serde<KR> keySerde,
+                                                              final Serde<VOut> valueSerde,
+                                                              final String queryableName,
+                                                                  final Windows<W> windows) {
+        build(groupPatterns, storeBuilder);
 
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildSessionWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                                  final Initializer<VOut> initializer,
+                                                                  final NamedInternal named,
+                                                                  final StoreBuilder<?> storeBuilder,
+                                                                  final Serde<KR> keySerde,
+                                                                  final Serde<VOut> valueSerde,
+                                                                  final String queryableName,
+                                                                  final SessionWindows sessionWindows,
+                                                                     final Merger<? super K, VOut> sessionMerger) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamSessionWindowAggregate<>(sessionWindows, storeBuilder.name(), initializer, kGroupedStream.getValue(), sessionMerger));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    private void build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                       final StoreBuilder<?> storeBuilder) {

Review comment:
       Can we call this something like `ensureCopartitioning` or `processRepartitions` or something? My take is that the copartitioning is the main point of this method so that's probably good to include in the name




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#issuecomment-671609419


   All the tests were green except for the flaky
   ```
      org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
       org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#issuecomment-670745736


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9141:
URL: https://github.com/apache/kafka/pull/9141


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org