You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/24 08:46:01 UTC

[GitHub] [kafka] showuon opened a new pull request #11124: [WIP] KAFKA-12839: use sessionWindow to represent SlidingWindows

showuon opened a new pull request #11124:
URL: https://github.com/apache/kafka/pull/11124


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11124: [WIP] KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-887994503


   It seems super awkward and likely to lead to confusion/future mistakes to use something called a `"SessionWindow"` in the "SlidingWindowAggregate", although yeah, it's pretty much exactly the same otherwise and can be reused.
   
   In other words, I totally agree with your proposal to just rename the existing Window implementations to describe the actual interval they represent, rather than some specific type of windowed operation that just happens to use them at the moment. (In fact I had written that first paragraph before I even saw your comment with the renaming proposal, great minds think alike huh 😜)
   
   That said, those names are just super clunky. Imagine trying to code something up with that...just takes too much mental processing. Maybe it's my inner physicist, but sometimes mathematical precision just isn't appropriate for real-world usage (don't tell any mathematicians I said that!) Unfortunately I'm not crazy about any of the alternatives I can think up, maybe you can come up with some better ideas. Here's the best I could come up with:
   
   `TimeWindow` --> `InclusiveExclusiveWindow` 
   `SessionWindow` / `SlidingWindow` --> `InclusiveInclusiveWindow`
   `UnlimitedWindow` --> `InclusiveUnboundedWindow`
   
   To me at least these feel more natural, ie it's clear what they mean without having to reference Wikipedia. I mean most people probably do know what open/closed mean, but inclusive/exclusive is more to the point. Also I think we can drop left/right and just imply it by the ordering. Thoughts? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r678903334



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -351,7 +351,8 @@ private void processEarly(final K key, final V value, final long inputRecordTime
             }
 
             if (combinedWindow == null) {
-                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                // created a [start, end] time interval window via SessionWindow
+                final SessionWindow window = new SessionWindow(0, windows.timeDifferenceMs());

Review comment:
       add a comment here to avoid confusion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679596497



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,79 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use CachingWindowStore to store the aggregated values internally, and then use TimeWindow to represent the "windowed KTable"
+        // thus, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       > Are you saying the CachingWindowStore internally uses a TimeWindow? 
   
   Yes, `CachingWindowStore` internally uses `TimeWindow` (i.e. `WindowKeySchema`). And looks like we use `TimeWinow` for `WindowStore`, and use `SessionWindow` for `SessionStore`.
   
   > doesn't this mean there's still a hole in the API since you can't use a custom WindowStore for a sliding windowed aggregation with the windowSize set to 0?
   
   I think so
   
   > If the WindowStore is going to represent different kinds of constant-size windows, it should probably be agnostic to the specific type of constant-sized window.
   
   Do you mean we should use `SessionWindow` (i.e. [start, end] inclusive time window) to represent the window? I'm not sure if this is the original design or just a miss. What do you think?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r689950309



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,78 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       Size should be `0L` ?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,78 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here

Review comment:
       Seems this comment needs to be updated?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679607519



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,79 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use CachingWindowStore to store the aggregated values internally, and then use TimeWindow to represent the "windowed KTable"
+        // thus, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       > Do you mean we should use SessionWindow (i.e. [start, end] inclusive time window) to represent the window?
   
   Not for the WindowStore (SessionWindow is fine for SessionStore of course), I'm saying if WindowStore is used for both inclusive-exclusive and also inclusive-inclusive windows, then we shouldn't ever assume one of them ie should not use an actual `TimeWindow` (or `InclusiveExclusiveWindow`) -- maybe we can just have a separate, plain window class that doesn't assume anything about its bounds.
   
   We can even just (re)use the `TimeWindow` for this if we do the renaming as discussed, and move the check that prevents size = 0 to that new class. Then we can just have a plain data container class `TimeWindow` that does nothing but hold the start and end time for use in window-agnostic cases like the CachingWindowStore. Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r691646220



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,78 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       Seems ok to me to wait and fix alongside other issues like KIP-300 in a "new and improved" DSL (or whatever we do there). If users start to complain and request a fix sooner then we can re-evaluate, but it's not like this was a user-reported bug to begin with.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r690013977



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,78 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       Hmmm... Seems to be in issue... The actual final return type is `KTable<Window<K>, V>` and thus is window-type agnostic. So we already have such a "container". -- However, `windowedBy(SlidingWindow)` returns a `TimeWindowedKStream`... Return types are not easy to change... And I don't think we can just switch from `TimeWindow` to `SlidingWindow` as concrete type either for the sliding window case...
   
   Maybe we are stuck and cannot fix the bug without a breaking change? For this case, we would indeed need to carry on with the KIP (but we could only do it in 4.0...), but I am wondering if it's worth fixing given the impact?
   
   Also: we have a few issues with the current DSL that we cannot fix easily (eg KIP-300). Thus, a long term solution could be, to leave the current API as-is, and built a new DSL in parallel (we did this in the past when we introduced `StreamsBuilder`). This way, we can change the API in any way, but it would be a long-term solution only.
   
   It might also help with regard to the new PAPI that uses `Record` instead of `<K,V>` type, and that is not easily adopted for `transform()` (and siblings). We could change the whole DSL to `Record` (ie, `KStream<Record<K,V>` -- or course we don't need `Record` in the generic type -- it's just for illustrative purpose). It would also cover the "add headers" KIP, fix KIP-300, we could introduce a `PartitionedKStream` (cf current KIP-759 discussion) and a few other minor issue (like rename `KGroupedStream` to `GroupedKStream`) all at once... And we could cleanup the topology optimization step and operator naming rules (which are a big mess to understand which `Named` object overwrites others...) -- We can also get rid of the wrappers for `KeyValueStore` to `TimestampedKeyValueStore` and change the interface from `Materialized<XxxStore>` to `Materialized<TimestampXxxStore`) -- In the past it was never worth to start a new DSL, but it seem we collected enough individual cases to maybe 
 justify this investment now?
   
   The only thing that we should consider is our investment into "versioned state stores / version KTables". If we build a new DSL it should be compatible to it -- if we cannot guarantee it, we might want to wait until we understand what API we need to versioned KTables in the DSL and make the cut afterwards?
   
   \cc @ableegoldman @guozhangwang @vvcephei @bbejeck @cadonna (also @inponomarev @jeqo @vcrfxia)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r682192656



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
##########
@@ -19,33 +19,24 @@
 import org.apache.kafka.streams.kstream.Window;
 
 /**
- * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end
- * timestamp as exclusive boundary.
- * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows
- * window specification}) will have the same size.
- * <p>
- * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ * A {@link TimeWindow} is a time interval window container that holds the start and end time for use in window-agnostic cases,
+ * ex: in {@link org.apache.kafka.streams.state.WindowStore}, we'll store the aggregated values of any fixed-size types of time windows.
+ * We use {@link TimeWindow} to represent these time windows
  *
  * @see SessionWindow
  * @see UnlimitedWindow
- * @see org.apache.kafka.streams.kstream.TimeWindows
- * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
 public class TimeWindow extends Window {
 
     /**
-     * Create a new window for the given start time (inclusive) and end time (exclusive).
+     * Create a new window for the given start time and end time.
      *
-     * @param startMs the start timestamp of the window (inclusive)
-     * @param endMs   the end timestamp of the window (exclusive)
-     * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than or equal to
-     * {@code startMs}
+     * @param startMs the start timestamp of the window

Review comment:
       You add `(inclusive)` and `(exclusive)` in `SessionWindow` but remove it here. Seems inconsistent?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -351,7 +351,8 @@ private void processEarly(final K key, final V value, final long inputRecordTime
             }
 
             if (combinedWindow == null) {
-                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                // created a [start, end] time interval window via SessionWindow
+                final SessionWindow window = new SessionWindow(0, windows.timeDifferenceMs());

Review comment:
       I would prefer to _first_ rename existing windows and not merge this PR using `SessionWindows` within `SlidingWindowAggregate`...

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
##########
@@ -19,33 +19,24 @@
 import org.apache.kafka.streams.kstream.Window;
 
 /**
- * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end
- * timestamp as exclusive boundary.
- * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows
- * window specification}) will have the same size.
- * <p>
- * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ * A {@link TimeWindow} is a time interval window container that holds the start and end time for use in window-agnostic cases,

Review comment:
       Why `window-agnostic` ? In general, I am not sure why we need to change the existing JavaDocs? What information do you think is missing or wong?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
##########
@@ -19,33 +19,24 @@
 import org.apache.kafka.streams.kstream.Window;
 
 /**
- * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end
- * timestamp as exclusive boundary.
- * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows
- * window specification}) will have the same size.
- * <p>
- * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ * A {@link TimeWindow} is a time interval window container that holds the start and end time for use in window-agnostic cases,
+ * ex: in {@link org.apache.kafka.streams.state.WindowStore}, we'll store the aggregated values of any fixed-size types of time windows.
+ * We use {@link TimeWindow} to represent these time windows
  *
  * @see SessionWindow
  * @see UnlimitedWindow
- * @see org.apache.kafka.streams.kstream.TimeWindows
- * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
 public class TimeWindow extends Window {
 
     /**
-     * Create a new window for the given start time (inclusive) and end time (exclusive).
+     * Create a new window for the given start time and end time.
      *
-     * @param startMs the start timestamp of the window (inclusive)
-     * @param endMs   the end timestamp of the window (exclusive)
-     * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than or equal to
-     * {@code startMs}
+     * @param startMs the start timestamp of the window
+     * @param endMs   the end timestamp of the window
+     * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than {@code startMs}
      */
     public TimeWindow(final long startMs, final long endMs) throws IllegalArgumentException {
         super(startMs, endMs);
-        if (startMs == endMs) {

Review comment:
       Why do you remove this check? A `TimeWindow` should not allow this case.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
##########
@@ -35,11 +35,6 @@
     private final TimeWindow window = new TimeWindow(start, end);
     private final SessionWindow sessionWindow = new SessionWindow(start, end);
 
-    @Test
-    public void endMustBeLargerThanStart() {
-        assertThrows(IllegalArgumentException.class, () -> new TimeWindow(start, start));
-    }

Review comment:
       Why do we need to remove this temporarily? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-904197111


   @dajac No, we're not aiming to get the fix into 2.8.1 -- no need to block on this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r689953353



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,78 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       @mjsax , this is the one remaining question left in this PR. Because we use `TimeWindow` to represent the windowed key, so that we can only set window size > 0. One solution provided by @ableegoldman is that we can have a window container that only holds the start and end time. What do you think? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-894973913


   @mjsax , as discussed, I've created a `SlidingWindow` for `SlidingWindows` aggregation. So we can support window size of 0 in SlidingWindows now. 
   
   But there's still a issue left: 
   > We can't create a state store with window size of 0 because we use `TimeWindow` to represent the "windowed KTable" result.
   
   The discussion thread is here: https://github.com/apache/kafka/pull/11124#discussion_r678908862
   
   We can create another jira ticket to handle it, but not sure if you have any suggestion for this issue? 
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r678905114



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,79 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use CachingWindowStore to store the aggregated values internally, and then use TimeWindow to represent the "windowed KTable"
+        // thus, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);
+
+        // we should support the "zero" time difference since sliding window is both start and end time inclusive
+        final KTable<Windowed<String>, String> table = builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(0), ofMillis(50)))
+            .aggregate(

Review comment:
       After this PR, we can create a window size 0 aggregation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r678908862



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,79 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use CachingWindowStore to store the aggregated values internally, and then use TimeWindow to represent the "windowed KTable"
+        // thus, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       Cannot create a store supplier with window size of 0 here because we use `TimeWindow` to represent the "windowed KTable" result. Use window size of 1 instead. (Not sure if this should be an improvement or it is expected?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-1030946835


   Make sense to me. Close this PR and cycle it back after DSL redesign. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679682895



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
##########
@@ -35,11 +35,6 @@
     private final TimeWindow window = new TimeWindow(start, end);
     private final SessionWindow sessionWindow = new SessionWindow(start, end);
 
-    @Test
-    public void endMustBeLargerThanStart() {
-        assertThrows(IllegalArgumentException.class, () -> new TimeWindow(start, start));
-    }

Review comment:
       Temporarily remove this test, and will be added back after KAFKA-13145 implements (renaming `TimeWindow` story)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-888883904


   @mjsax @ableegoldman @lct45 , I've added tests. Please help review. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon closed pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
showuon closed pull request #11124:
URL: https://github.com/apache/kafka/pull/11124


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-1030907820


   @showuon -- should we close this PR for now?
   
   We have actually collected a few tickets that are not easy to fix without a DSL re-design. I would propose to add a new Jira label to tag all relevant tickets and we hold off working on them for now. When we do a DSL re-design we can cycle back?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679682334



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,78 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // create a window store supplier with window size of 0
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 0L);

Review comment:
       We can create window store with window size of 0 now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-889830983


   @ableegoldman , please take a look if this is what you expected. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679682023



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
##########
@@ -19,33 +19,24 @@
 import org.apache.kafka.streams.kstream.Window;
 
 /**
- * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end
- * timestamp as exclusive boundary.
- * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows
- * window specification}) will have the same size.
- * <p>
- * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ * A {@link TimeWindow} is a time interval window container that holds the start and end time for use in window-agnostic cases,
+ * ex: in {@link org.apache.kafka.streams.state.WindowStore}, we'll store the aggregated values of any fixed-size types of time windows.
+ * We use {@link TimeWindow} to represent these time windows

Review comment:
       change the `TimeWindow` java doc description




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679613226



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,79 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use CachingWindowStore to store the aggregated values internally, and then use TimeWindow to represent the "windowed KTable"
+        // thus, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       Make sense to me! I'll work on it. Thanks for suggestion!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11124: [WIP] KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-888268160


   @ableegoldman , really glad we have the same thought about it! (high-five :) )
   I created a jira ticket KAFKA-13145 for this renaming improvement, and see if there are other opinions there. 
   
   For your suggestions: Yes, I agree most people probably do know what open/closed mean, but inclusive/exclusive is more to the point. +1
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11124: [WIP] KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-887989639


   @mjsax @ableegoldman , I found we already had a `SessionWindow` to represent the close time interval: `[start,end]`. So I directly use it for `SlidingWindows` aggregation window creation. I think this is what we want, right?
   
   I have another thought, which is to rename the time interval related windows. Currently, we have 3 types of time interval window:
   `TimeWindow` -> to have `[start,end)` time interval
   `SessionWindow` -> to have `[start,end]` time interval
   `UnlimitedWindow` -> to have `[start, MAX_VALUE)` time interval
   
   I think the name `SessionWindow` is definitely not good here, especially we want to use it in `SlidingWindows` now, although it is only used for `SessionWindows` before. We should name them with time interval meaning, not the streaming window functions meaning. ex:
   `TimeWindow` ->  `LeftClosedRightOpenWindow`
   `SessionWindow` -> `ClosedTimeWindow`
   `UnlimitedWindow` -> `LeftClosedWindow`
   ref: the `Classification of intervals` section in https://en.wikipeadia.org/wiki/Interval_(mathematics)
   
   Because these 3 window types are internal use only, it is safe to rename them. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11124: KAFKA-12839: use sessionWindow to represent SlidingWindows

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r679413628



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,79 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use CachingWindowStore to store the aggregated values internally, and then use TimeWindow to represent the "windowed KTable"
+        // thus, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       Are you saying the `CachingWindowStore` internally uses a `TimeWindow`? Or is the `TimeWindow` somewhere along the store supplier code path...?
   
   Either way, doesn't this mean there's still a hole in the API since you can't use a custom WindowStore for a sliding windowed aggregation with the windowSize set to 0? If the WindowStore is going to represent different kinds of constant-size windows, it should probably be agnostic to the specific type of constant-sized window.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r682231711



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##########
@@ -100,17 +99,79 @@
 
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final String threadId = Thread.currentThread().getName();
+    private final String topic = "topic";
+    private final String defaultInOrderName = "InOrder";
+    private final String defaultReverseName = "Reverse";
+    private final long defaultWindowSize = 10L;
+    private final long defaultRetentionPeriod = 5000L;
+
+    private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator,
+                                                      final String inOrderName,
+                                                      final String reverseName,
+                                                      final long windowSize) {
+        return inOrderIterator
+            ? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false)
+            : Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAggregateSmallInputWithZeroTimeDifference() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // We use CachingWindowStore to store the aggregated values internally, and then use TimeWindow to represent the "windowed KTable"
+        // thus, the window size must be greater than 0 here
+        final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
       @mjsax , answering your review comments here. Please check the above discussion for more info.
   
   > Why window-agnostic ? In general, I am not sure why we need to change the existing JavaDocs? What information do you think is missing or wrong?
   
   > You add (inclusive) and (exclusive) in SessionWindow but remove it here. Seems inconsistent?
   
   > Why do you remove this check? A TimeWindow should not allow this case.
   
   > Why do we need to remove this temporarily?
   
   --> The answer for the above questions are that we can't create a store supplier with window size of 0 here because we use `TimeWindow` to represent the "windowed KTable" result. @ableegoldman and I both thought it doesn't make sense to use `TimeWindow` to represent it if `WindowStore` is used for both inclusive-exclusive and also inclusive-inclusive windows. We should have a neutral time window for this case. That's why Sophie suggested that we should have a container class that does nothing but hold the start and end time for use in window-agnostic cases like the `CachingWindowStore`. And the container class can be named `TimeWindow`, and we were thinking that after all, we'll rename the `TimeWindow` into `InclusiveExclusiveWindow`, so that's why I changed the java doc/start and end time checking/test for it. 
   
   So, since we agreed that we won't rename the window, I'll revert it. But still, there's a question there:  
   _We can't create a store supplier with window size of 0 here because we use `TimeWindow` to represent the "windowed KTable" result._
   Do you agree we should have a container class to do nothing but hold the start and end time for use in window-agnostic cases like the `CachingWindowStore`? Or any other suggestions?
   
   Thank you.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org