You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/04 22:40:50 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

ableegoldman commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465369865



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -132,16 +135,19 @@
                                                                                     final boolean stateCreated,
                                                                                     final StoreBuilder<?> storeBuilder,
                                                                                     final Windows<W> windows,
+                                                                                    final SlidingWindows slidingWindows,
                                                                                     final SessionWindows sessionWindows,
                                                                                     final Merger<? super K, VOut> sessionMerger) {
 
         final ProcessorSupplier<K, ?> kStreamAggregate;
 
-        if (windows == null && sessionWindows == null) {
+        if (windows == null && slidingWindows == null && sessionWindows == null) {
             kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator);
-        } else if (windows != null && sessionWindows == null) {
+        } else if (windows != null && slidingWindows == null && sessionWindows == null) {
             kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator);
-        } else if (windows == null && sessionMerger != null) {
+        } else if (windows == null && slidingWindows != null && sessionWindows == null) {
+            kStreamAggregate = new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, aggregator);
+        } else if (windows == null && slidingWindows == null && sessionMerger != null) {

Review comment:
       @mjsax why do we have a single method that accepts all three window types and then checks them all individually to enforce that only one type of window is actually "set"? Seems like we could enforce this implicitly by having a separate method for time, session, and non-windowed aggregates and then just calling the correct signature. ie `SessionWindowedCogroupedKStreamImpl` calls `build(...SessionWindows, SessionMerger) and so on.
   
   Maybe I'm missing something here because I wasn't following the cogroup KIP that closely, but is this even exposed to the user in any way? My understanding is that there's no way for this check to be violated by any kind of user input, because this method is only ever called directly by Streams internal code with `null` hardcoded for the unused window types.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org