You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/28 06:28:27 UTC

[GitHub] [kafka] showuon commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

showuon commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r659484507



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end will be dropped.

Review comment:
       I expected that the javadoc for `ofTimeDifferenceWithNoGrace` should be mostly the same except `grace` parameter. Is there any reason why they are different?
   
   Same comment to other places.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -83,6 +83,14 @@ protected JoinWindows(final JoinWindows joinWindows) {
         afterMs = joinWindows.afterMs;
         graceMs = joinWindows.graceMs;
         enableSpuriousResultFix = joinWindows.enableSpuriousResultFix;
+
+        if (beforeMs + afterMs < 0) {
+            throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");

Review comment:
       Could we make this Constructor to call another overloaded constructor, to avoid duplicated codes? i.e.
   ```java
   protected JoinWindows(final JoinWindows joinWindows) {
       this(joinWindows.beforeMs, joinWindows.afterMs, joinWindows.graceMs, joinWindows.enableSpuriousResultFix);
   }
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -108,13 +147,12 @@ public static SessionWindows with(final Duration inactivityGap) {
      * @param afterWindowEnd The grace period to admit out-of-order events to a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, Duration)} instead
      */
+    @Deprecated
     public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
-        if (afterWindowEndMs < 0) {
-            throw new IllegalArgumentException("Grace period must not be negative.");
-        }

Review comment:
       It's nice you move this check into the constructor. But there is also other validation you didn't move to constructor, ex: `validateMillisecondDuration(afterWindowEnd, msgPrefix)`. 
   
   We used to have 2 ways to create `SessionWindows`: `grace` and `with`. Now we added 2 more: `ofInactivityGapAndGrace`; and `ofInactivityGapAndNoGrace`. We should also validate the parameters, either in constructor, or in each method.
   
   Same comment to other places.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -89,18 +132,16 @@ private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
      * @param grace the grace period to admit out-of-order events to a window
      * @return a new window definition
      * @throws IllegalArgumentException if the specified window size is &lt; 0 or grace &lt; 0, or either can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} or {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead
      */
+    @Deprecated
     public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration grace) throws IllegalArgumentException {
         final String msgPrefixSize = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefixSize);
-        if (timeDifferenceMs < 0) {
-            throw new IllegalArgumentException("Window time difference must not be negative.");
-        }
+
         final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, "grace");
         final long graceMs = validateMillisecondDuration(grace, msgPrefixGrace);

Review comment:
       Same as above mentioned, the validation didn't get handled in new API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end will be dropped.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
+     * @return a new JoinWindows object with the window definition and no grace period. Note that this means out of order records arriving after the window end will be dropped
+     */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), 0L, true);
+        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), NO_GRACE_PERIOD, true);
     }
 
-     /**
+    /**
      * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference join window interval
+     * @param timeDifference
+     * @return a new JoinWindows object with the window definition with and grace period (uses old default of 24 hours)
      * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead

Review comment:
       Please add a period after `@deprecated since 3.0`. i.e.
   `@deprecated since 3.0. Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead`
   
   Same comments to below identical places.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
##########
@@ -40,7 +40,11 @@
 
     // By default grace period is 24 hours for all windows,
     // in other words we allow out-of-order data for up to a day
-    protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L;
+    // This behavior is now deprecated
+    protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L;

Review comment:
       This naming is confusing to uses. This will make user think that 24 hours grace period cannot be used anymore. No I don't think that's what we want. 24 hours is still good to use, if user believe that's what they want. That means, we don't **deprecate** the 24 hours grace period, just don't set as default value, so we should not name it as "deprecated", "old", things.
   
   Correct me if I'm wrong, @izzyacademy @ableegoldman . Thank you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org