You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/24 23:16:29 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

ableegoldman commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r658345192



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -83,16 +92,52 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {
+        return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Reject out-of-order events that arrive more than {@code millisAfterWindowEnd} after the end of its window.

Review comment:
       The javadocs here should at least contain everything in the `ofSizeWithNoGrace` javadocs, but maybe with a bit more on the grace period. Both of them are creating a new tumbling window definition

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
##########
@@ -71,6 +71,7 @@
 /**
  * Tests related to internal topics in streams
  */
+@SuppressWarnings("deprecation")

Review comment:
       Ok, I see now that there are a lot of tests using the deprecated methods. We should absolutely migrate them all to the new APIs (of course we will be forced to once the deprecated ones are removed), but it's ok with me, and maybe even slightly preferable, to do this in a followup PR. That way we can keep this PR focused on the changes themselves and just the tests for the new APIs.
   
   Can you file a ticket to migrate all of the remaining tests over to the new APIs and remove the warning suppression? Then you can start working on that after this PR is merged, or in parallel with the review but on a separate branch

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,58 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
+     */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), 0L, true);
+        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), NO_GRACE_PERIOD, true);
     }
 
-     /**
+    /**
      * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference join window interval
+     * @param timeDifference
+     * @return

Review comment:
       nit: either remove this line or fill it out, eg `@return a new JoinWindows specification with a 24hour grace period`. Probably good to make sure this javadoc is consistent across all the builder methods in this class, ie either all methods have the `@return` specified, or none of them do. Same for any of the other windows classes

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -341,6 +341,7 @@ private void testReprocessingFromScratchAfterResetWithIntermediateUserTopic(fina
         }
     }
 
+    @SuppressWarnings("deprecation")

Review comment:
       Same here, and for all the tests really -- any test should be migrated to use the non-deprecated version of the API, unless it's explicitly testing the behavior of the old, now-deprecated, method

##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -62,6 +62,7 @@
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")

Review comment:
       We should migrate this to use the new non-deprecated methods instead of just suppressing the warning

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -83,16 +92,52 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {

Review comment:
       We should call out explicitly that this is setting the grace period to 0, which means that out of order records arriving after the window end will be dropped. Otherwise it's too easy to just use this method without thinking any further about the grace period and what it means/whether you want it

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -78,6 +82,43 @@
     private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
         this.timeDifferenceMs = timeDifferenceMs;
         this.graceMs = graceMs;
+
+        if (timeDifferenceMs < 0) {
+            throw new IllegalArgumentException("Window time difference must not be negative.");
+        }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Window grace period must not be negative.");
+        }
+    }
+
+    /**
+     * Return a window definition with the window size

Review comment:
       ```suggestion
        * Return a window definition with the window size and no grace period. Note that this means out of order records arriving after the window end will be dropped.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org