You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/07 19:24:38 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

ableegoldman commented on a change in pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#discussion_r467210400



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,

Review comment:
       WDYT about naming this just `build`? It's not as clear, but I think it's in line with the naming conventions elsewhere. For example we have `KStreamWindowAggregate` and `KStreamAggregate` (then maybe we can come up with a more descriptive name for the method currently called `build`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -85,68 +163,39 @@
         groupedStreams.remove(kGrouped);
         kGrouped.ensureCopartitionWith(groupedStreams);
 
-        final Collection<StreamsGraphNode> processors = new ArrayList<>();
-        boolean stateCreated = false;
-        int counter = 0;
-        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
-                kGroupedStream.getValue(),
-                initializer,
-                named.suffixWithOrElseGet(
-                    "-cogroup-agg-" + counter++,
-                    builder,
-                    CogroupedKStreamImpl.AGGREGATE_NAME),
-                stateCreated,
-                storeBuilder,
-                windows,
-                sessionWindows,
-                sessionMerger);
-            stateCreated = true;
-            processors.add(statefulProcessorNode);
-            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
-        }
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> createTable(final Collection<StreamsGraphNode> processors,
+                                                             final NamedInternal named,
+                                                             final Serde<KR> keySerde,
+                                                             final Serde<VOut> valueSerde,
+                                                             final String queryableName) {
         final String mergeProcessorName = named.suffixWithOrElseGet(
-            "-cogroup-merge",
-            builder,
-            CogroupedKStreamImpl.MERGE_NAME);
+                "-cogroup-merge",
+                builder,
+                CogroupedKStreamImpl.MERGE_NAME);
         final ProcessorSupplier<K, VOut> passThrough = new PassThrough<>();
         final ProcessorGraphNode<K, VOut> mergeNode =
-            new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
+                new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName));
 
         builder.addGraphNode(processors, mergeNode);
 
         return new KTableImpl<KR, VIn, VOut>(
-            mergeProcessorName,
-            keySerde,
-            valueSerde,
-            Collections.singleton(mergeNode.nodeName()),
-            queryableName,
-            passThrough,
-            mergeNode,
-            builder);
+                mergeProcessorName,
+                keySerde,
+                valueSerde,
+                Collections.singleton(mergeNode.nodeName()),
+                queryableName,
+                passThrough,
+                mergeNode,
+                builder);
     }
 
-    private <W extends Window> StatefulProcessorNode<K, ?> getStatefulProcessorNode(final Aggregator<? super K, Object, VOut> aggregator,
-                                                                                    final Initializer<VOut> initializer,
+    private <W extends Window> StatefulProcessorNode<K, ?> getStatefulProcessorNode(final Initializer<VOut> initializer,

Review comment:
       We can remove the `<W extends Window>` now, right? Also it looks like the `initializer` input isn't needed anymore either

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -85,68 +163,39 @@
         groupedStreams.remove(kGrouped);
         kGrouped.ensureCopartitionWith(groupedStreams);
 
-        final Collection<StreamsGraphNode> processors = new ArrayList<>();
-        boolean stateCreated = false;
-        int counter = 0;
-        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
-            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
-                kGroupedStream.getValue(),
-                initializer,
-                named.suffixWithOrElseGet(
-                    "-cogroup-agg-" + counter++,
-                    builder,
-                    CogroupedKStreamImpl.AGGREGATE_NAME),
-                stateCreated,
-                storeBuilder,
-                windows,
-                sessionWindows,
-                sessionMerger);
-            stateCreated = true;
-            processors.add(statefulProcessorNode);
-            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
-        }
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> createTable(final Collection<StreamsGraphNode> processors,

Review comment:
       Can remove the `W extends Window` generic

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
                                                        final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valueSerde,
-                                                       final String queryableName,
-                                                       final Windows<W> windows,
-                                                       final SessionWindows sessionWindows,
-                                                       final Merger<? super K, VOut> sessionMerger) {
+                                                       final String queryableName) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildTimeWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,

Review comment:
       Honestly I think it's fine to just name all three of these `build`, since they accept different parameters and it should be pretty clear from the context whether it's windowed or not. But being more descriptive is never a bad thing either. Your call 🙂 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,

Review comment:
       Also you can remove the `Vin` and `W extends Window` generics on this method

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
                                                        final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valueSerde,
-                                                       final String queryableName,
-                                                       final Windows<W> windows,
-                                                       final SessionWindows sessionWindows,
-                                                       final Merger<? super K, VOut> sessionMerger) {
+                                                       final String queryableName) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);

Review comment:
       This is all the same for all three methods except for the `KStreamAggregate`/`KStreamWindowAggregate`/etc right? I think if you wanted to further deduplicate things you could factor this  out into a method that accepts a `Function< KGroupedStreamImpl, KStreamAggProcessorSupplier`>, and then each of the `build` methods can just pass in a function that returns `new KStreamWindowAggregate` or so on.
   I'm not sure it's really worth it or not, but  it can be done in case you were wondering. Up to you whether you want to do it

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
                                                        final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valueSerde,
-                                                       final String queryableName,
-                                                       final Windows<W> windows,
-                                                       final SessionWindows sessionWindows,
-                                                       final Merger<? super K, VOut> sessionMerger) {
+                                                       final String queryableName) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildTimeWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,

Review comment:
       Don't need `Vin` here

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
                                                        final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valueSerde,
-                                                       final String queryableName,
-                                                       final Windows<W> windows,
-                                                       final SessionWindows sessionWindows,
-                                                       final Merger<? super K, VOut> sessionMerger) {
+                                                       final String queryableName) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildTimeWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                              final Initializer<VOut> initializer,
+                                                              final NamedInternal named,
+                                                              final StoreBuilder<?> storeBuilder,
+                                                              final Serde<KR> keySerde,
+                                                              final Serde<VOut> valueSerde,
+                                                              final String queryableName,
+                                                                  final Windows<W> windows) {
+        build(groupPatterns, storeBuilder);
 
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildSessionWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,

Review comment:
       Don't need `Vin` and `W extends Window` here

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -47,18 +47,96 @@
     CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
         this.builder = builder;
     }
-
-    <KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
                                                        final Initializer<VOut> initializer,
                                                        final NamedInternal named,
                                                        final StoreBuilder<?> storeBuilder,
                                                        final Serde<KR> keySerde,
                                                        final Serde<VOut> valueSerde,
-                                                       final String queryableName,
-                                                       final Windows<W> windows,
-                                                       final SessionWindows sessionWindows,
-                                                       final Merger<? super K, VOut> sessionMerger) {
+                                                       final String queryableName) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildTimeWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                              final Initializer<VOut> initializer,
+                                                              final NamedInternal named,
+                                                              final StoreBuilder<?> storeBuilder,
+                                                              final Serde<KR> keySerde,
+                                                              final Serde<VOut> valueSerde,
+                                                              final String queryableName,
+                                                                  final Windows<W> windows) {
+        build(groupPatterns, storeBuilder);
 
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, kGroupedStream.getValue()));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    <KR, VIn, W extends Window> KTable<KR, VOut> buildSessionWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                                  final Initializer<VOut> initializer,
+                                                                  final NamedInternal named,
+                                                                  final StoreBuilder<?> storeBuilder,
+                                                                  final Serde<KR> keySerde,
+                                                                  final Serde<VOut> valueSerde,
+                                                                  final String queryableName,
+                                                                  final SessionWindows sessionWindows,
+                                                                     final Merger<? super K, VOut> sessionMerger) {
+        build(groupPatterns, storeBuilder);
+        final Collection<StreamsGraphNode> processors = new ArrayList<>();
+        boolean stateCreated = false;
+        int counter = 0;
+        for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
+            final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
+                    initializer,
+                    named.suffixWithOrElseGet(
+                            "-cogroup-agg-" + counter++,
+                            builder,
+                            CogroupedKStreamImpl.AGGREGATE_NAME),
+                    stateCreated,
+                    storeBuilder,
+                    new KStreamSessionWindowAggregate<>(sessionWindows, storeBuilder.name(), initializer, kGroupedStream.getValue(), sessionMerger));
+            stateCreated = true;
+            processors.add(statefulProcessorNode);
+            builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
+        }
+        return createTable(processors, named, keySerde, valueSerde, queryableName);
+    }
+
+    private void build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
+                                                       final StoreBuilder<?> storeBuilder) {

Review comment:
       Can we call this something like `ensureCopartitioning` or `processRepartitions` or something? My take is that the copartitioning is the main point of this method so that's probably good to include in the name




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org