You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/04 00:37:51 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

mjsax commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r682192656



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
##########
@@ -19,33 +19,24 @@
 import org.apache.kafka.streams.kstream.Window;
 
 /**
- * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end
- * timestamp as exclusive boundary.
- * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows
- * window specification}) will have the same size.
- * <p>
- * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ * A {@link TimeWindow} is a time interval window container that holds the start and end time for use in window-agnostic cases,
+ * ex: in {@link org.apache.kafka.streams.state.WindowStore}, we'll store the aggregated values of any fixed-size types of time windows.
+ * We use {@link TimeWindow} to represent these time windows
  *
  * @see SessionWindow
  * @see UnlimitedWindow
- * @see org.apache.kafka.streams.kstream.TimeWindows
- * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
 public class TimeWindow extends Window {
 
     /**
-     * Create a new window for the given start time (inclusive) and end time (exclusive).
+     * Create a new window for the given start time and end time.
      *
-     * @param startMs the start timestamp of the window (inclusive)
-     * @param endMs   the end timestamp of the window (exclusive)
-     * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than or equal to
-     * {@code startMs}
+     * @param startMs the start timestamp of the window

Review comment:
       You add `(inclusive)` and `(exclusive)` in `SessionWindow` but remove it here. Seems inconsistent?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -351,7 +351,8 @@ private void processEarly(final K key, final V value, final long inputRecordTime
             }
 
             if (combinedWindow == null) {
-                final TimeWindow window = new TimeWindow(0, windows.timeDifferenceMs());
+                // created a [start, end] time interval window via SessionWindow
+                final SessionWindow window = new SessionWindow(0, windows.timeDifferenceMs());

Review comment:
       I would prefer to _first_ rename existing windows and not merge this PR using `SessionWindows` within `SlidingWindowAggregate`...

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
##########
@@ -19,33 +19,24 @@
 import org.apache.kafka.streams.kstream.Window;
 
 /**
- * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end
- * timestamp as exclusive boundary.
- * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows
- * window specification}) will have the same size.
- * <p>
- * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ * A {@link TimeWindow} is a time interval window container that holds the start and end time for use in window-agnostic cases,

Review comment:
       Why `window-agnostic` ? In general, I am not sure why we need to change the existing JavaDocs? What information do you think is missing or wong?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
##########
@@ -19,33 +19,24 @@
 import org.apache.kafka.streams.kstream.Window;
 
 /**
- * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end
- * timestamp as exclusive boundary.
- * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows
- * window specification}) will have the same size.
- * <p>
- * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ * A {@link TimeWindow} is a time interval window container that holds the start and end time for use in window-agnostic cases,
+ * ex: in {@link org.apache.kafka.streams.state.WindowStore}, we'll store the aggregated values of any fixed-size types of time windows.
+ * We use {@link TimeWindow} to represent these time windows
  *
  * @see SessionWindow
  * @see UnlimitedWindow
- * @see org.apache.kafka.streams.kstream.TimeWindows
- * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
 public class TimeWindow extends Window {
 
     /**
-     * Create a new window for the given start time (inclusive) and end time (exclusive).
+     * Create a new window for the given start time and end time.
      *
-     * @param startMs the start timestamp of the window (inclusive)
-     * @param endMs   the end timestamp of the window (exclusive)
-     * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than or equal to
-     * {@code startMs}
+     * @param startMs the start timestamp of the window
+     * @param endMs   the end timestamp of the window
+     * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than {@code startMs}
      */
     public TimeWindow(final long startMs, final long endMs) throws IllegalArgumentException {
         super(startMs, endMs);
-        if (startMs == endMs) {

Review comment:
       Why do you remove this check? A `TimeWindow` should not allow this case.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
##########
@@ -35,11 +35,6 @@
     private final TimeWindow window = new TimeWindow(start, end);
     private final SessionWindow sessionWindow = new SessionWindow(start, end);
 
-    @Test
-    public void endMustBeLargerThanStart() {
-        assertThrows(IllegalArgumentException.class, () -> new TimeWindow(start, start));
-    }

Review comment:
       Why do we need to remove this temporarily? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org