You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/04 14:03:11 UTC

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r465073439



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -132,16 +135,19 @@
                                                                                     final boolean stateCreated,
                                                                                     final StoreBuilder<?> storeBuilder,
                                                                                     final Windows<W> windows,
+                                                                                    final SlidingWindows slidingWindows,
                                                                                     final SessionWindows sessionWindows,
                                                                                     final Merger<? super K, VOut> sessionMerger) {
 
         final ProcessorSupplier<K, ?> kStreamAggregate;
 
-        if (windows == null && sessionWindows == null) {
+        if (windows == null && slidingWindows == null && sessionWindows == null) {
             kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator);
-        } else if (windows != null && sessionWindows == null) {
+        } else if (windows != null && slidingWindows == null && sessionWindows == null) {
             kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator);
-        } else if (windows == null && sessionMerger != null) {
+        } else if (windows == null && slidingWindows != null && sessionWindows == null) {
+            kStreamAggregate = new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, aggregator);
+        } else if (windows == null && slidingWindows == null && sessionMerger != null) {

Review comment:
       Would checking both be redundant? It looks like the method that ultimately calls this one will check that sessionMerger is not null for session windows, so I think either both of these will be null or neither will be null




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org