You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/02 08:20:19 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

cadonna commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r662830942



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -81,18 +90,63 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * <p>
      * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
+     * Using the method implicitly sets the grace period to zero which means
+     * that out of order records arriving after the window end will be dropped
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period. Note that this means out of order records arriving after the window end will be dropped
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {
+        return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code advance == size}.
+     * Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which means
+     * that out of order records arriving after the window end will be dropped.
+     *
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param size The size of the window. Must be larger than zero
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window. Must be non-negative.
+     * @return a TimeWindows object with the specified size and the specified grace period
+     * @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeAndGrace(final Duration size, final Duration afterWindowEnd)
+            throws IllegalArgumentException {
+
+        final long sizeMs = size.toMillis();
+        final long afterWindowEndMs = afterWindowEnd.toMillis();
+
+        return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code advance == size}.
+     *
+     * @param size The size of the window
+     * @return a new window definition without specifying the grace period (uses old default of 24 hours)
      * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofSizeWithNoGrace(Duration)} } instead
      */
+    @Deprecated
     public static TimeWindows of(final Duration size) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
         final long sizeMs = validateMillisecondDuration(size, msgPrefix);
-        if (sizeMs <= 0) {
-            throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
-        }
-        return new TimeWindows(sizeMs, sizeMs, DEFAULT_GRACE_PERIOD_MS);
+
+        return new TimeWindows(sizeMs, sizeMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD);

Review comment:
       This is not completely compatible with the behavior of older Streams apps. See #10953 for more details.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org