You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/24 19:44:05 UTC

[GitHub] [kafka] izzyacademy opened a new pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

izzyacademy opened a new pull request #10926:
URL: https://github.com/apache/kafka/pull/10926


   [KAFKA-8613] Make grace period mandatory
   
   KIP-633 New APIs for Controlling Grace Period for Windowed Operations
   
   - Added API changes by KIP-633 for JoinWindows, SessionWindows, TimeWindows and SlidingWindows 
   - Renamed Windows.DEFAULT_GRACE_PERIOD_MS to DEPRECATED_OLD_24_HR_GRACE_PERIOD
   - Added new constant Windows.NO_GRACE_PERIOD to avoid magic constants when 0 is specified as grace Period
   - Added preliminary Java unit test cases for new API methods 
   - Replaced Deprecated calls with equivalent in Examples
   - Replaced Deprecated API calls in Scala tests with updated API method calls
   - Added Deprecation suppression in Tests for deprecated API method calls in Java and Scala Tests
   
   @ableegoldman @mjsax @cadonna @showuon please review when you have a moment
   
   I apologize in advance but I was unable to merge the new changes into the old PR 
   https://github.com/apache/kafka/pull/10740
   
   I spent a lot of time trying to rebase and merge but it just did not work. So I had to create a new branch and a new PR without the conflicts.
   
   modified:   streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
   modified:   streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
   modified:   streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
   modified:   streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
   modified:   streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
   
   modified:   streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
   modified:   streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
   modified:   streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
   
   modified:   streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
   modified:   streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
   modified:   streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
   modified:   streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
   
   modified:   streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
   
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
   modified:   streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
   modified:   streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
   modified:   streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
   modified:   streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r658381873



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
##########
@@ -71,6 +71,7 @@
 /**
  * Tests related to internal topics in streams
  */
+@SuppressWarnings("deprecation")

Review comment:
       @ableegoldman I have created JIRA ticket KAFKA-12994 to track the tasks of migrating the tests to the new APIs and then removing the deprecation warnings.
   
   https://issues.apache.org/jira/browse/KAFKA-12994




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r661136400



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to
+     * the duration specified by {@code afterWindowEnd} which means that out of order records arriving

Review comment:
       `out-of-order` (with `-`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -81,18 +90,63 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * <p>
      * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
+     * Using the method implicitly sets the grace period to zero which means
+     * that out of order records arriving after the window end will be dropped
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period. Note that this means out of order records arriving after the window end will be dropped
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {
+        return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code advance == size}.
+     * Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which means
+     * that out of order records arriving after the window end will be dropped.
+     *
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param size The size of the window. Must be larger than zero
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window. Must be non-negative.
+     * @return a TimeWindows object with the specified size and the specified grace period
+     * @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeAndGrace(final Duration size, final Duration afterWindowEnd)
+            throws IllegalArgumentException {

Review comment:
       nit: move the previous line?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to
+     * the duration specified by {@code afterWindowEnd} which means that out of order records arriving
+     * after the window end will be dropped. The delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end will be dropped.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
+     * @return a new JoinWindows object with the window definition and no grace period. Note that this means out of order records arriving after the window end will be dropped
+     */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), 0L, true);
+        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), NO_GRACE_PERIOD, true);
     }
 
-     /**
+    /**
      * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference join window interval
+     * @param timeDifference

Review comment:
       Why is `join window interval` removed?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end will be dropped.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
+     * @return a new JoinWindows object with the window definition and no grace period. Note that this means out of order records arriving after the window end will be dropped
+     */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), 0L, true);
+        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), NO_GRACE_PERIOD, true);
     }
 
-     /**
+    /**
      * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference join window interval
+     * @param timeDifference
+     * @return a new JoinWindows object with the window definition with and grace period (uses old default of 24 hours)
      * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead

Review comment:
       nit. I think there is `.` missing `since 3.0[.] Use`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to
+     * the duration specified by {@code afterWindowEnd} which means that out of order records arriving
+     * after the window end will be dropped. The delay is defined as (stream_time - record_timestamp).

Review comment:
       `window end` -> `window closed`
   
   The window ends when `afterMs` passed, but we keep the window open until `afterMs + grace` which we call the "close time" of the window.
   
   > The delay is defined as (stream_time - record_timestamp).
   I think we can omit this?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -108,13 +154,12 @@ public static SessionWindows with(final Duration inactivityGap) {
      * @param afterWindowEnd The grace period to admit out-of-order events to a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, Duration)} instead

Review comment:
       Missing `.` after `since 3.0`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end will be dropped.

Review comment:
       `out-of-order`
   `window closed`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than

Review comment:
       `earlier or later` -> `before or after` (to avoid confusion with the term "late data")

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -177,7 +204,9 @@ public long size() {
      * @param afterWindowEnd The grace period to admit out-of-order events to a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} instead
      */
+    @Deprecated
     public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {

Review comment:
       I am just wondering about `return new JoinWindows(beforeMs, afterMs, afterWindowEndMs, false);` at the end of this method. Should it really _disable_ the fix by passing hard-coded `false`? It might be better to update `false` to `enableSpuriousResultFix`? If the old `of(size)` was called, the flag is set to false already correctly, but if the new `ofTimeDifferenceXxx()` is called, it might be weird to disable the fix when `grace` is called?
   
   Or we do a check if the new API was used originally, and disallow calling `grace()` for this case?
   ```
   if (enableSpuriousResultFix) {
       throw new IllegalStateExecption("You can use grace() only if you create the JoinWindow using of(size) method. If you use ofTimeDifferenceAndGrace() or ofTimeDifferenceNoGrace() it is not allowed to change the grace period afterwards.");
   }
   ```
   
   \cc @ableegoldman WDYT?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -78,23 +79,68 @@
     private SessionWindows(final long gapMs, final long graceMs) {
         this.gapMs = gapMs;
         this.graceMs = graceMs;
+
+        if (gapMs <= 0) {
+            throw new IllegalArgumentException("Gap time cannot be zero or negative.");
+        }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
     }
 
     /**
-     * Create a new window specification with the specified inactivity gap.
+     * Creates a new window specification with the specified inactivity gap.
+     * Using the method implicitly sets the grace period to zero which
+     * means that out of order records arriving after the window end will be dropped

Review comment:
       `out-of-order` (this may be a typo on other places, too) Can you fix everywhere?
   
   `window end` -> `window closed` (same -- please also fix elsewhere)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to
+     * the duration specified by {@code afterWindowEnd} which means that out of order records arriving
+     * after the window end will be dropped. The delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}

Review comment:
       `of` -> `or`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method explicitly sets the grace period to
+     * the duration specified by {@code afterWindowEnd} which means that out of order records arriving
+     * after the window end will be dropped. The delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than

Review comment:
       `before or after`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r659381726



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -83,16 +92,52 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {

Review comment:
       In fact, since this and `ofSizeAndGrace` are pretty much the same thing -- differing only in the value of one config -- they should have the same javadocs completely. 
   
   Only the `@returns` and `@param` would be different. And maybe one sentence in each which says either `"this window definition allows you to specify the grace period"` or `"this window definition does not require setting the grace period, note that this means out-of-order data arriving after the window end will be dropped so think carefully about your requirements before using this in production"` --- or something like that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax edited a comment on pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
mjsax edited a comment on pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#issuecomment-871807518


   Thanks for the PR @izzyacademy and congrats to your first contribution!
   
   Merged to meet the feature freeze deadline. Open comments can be addressed in follow up PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#issuecomment-871807518


   Thanks for the PR @izzyacademy and contrast to your first contribution!
   
   Merged to meet the feature freeze deadline. Open comments can be addressed in follow up PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r661486188



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -177,7 +204,9 @@ public long size() {
      * @param afterWindowEnd The grace period to admit out-of-order events to a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} instead
      */
+    @Deprecated
     public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {

Review comment:
       @mjsax I believe those came with your changes in the PR you merged into trunk.
   
   @mjsax, @ableegoldman, please let me know what you think and I can address that as well in the future JIRA ticket.
   
   It could require updating all the unit tests where org.apache.kafka.streams.kstream.JoinWindows.grace() is invoked so that looks like a significant undertaking given today is the cutoff date for merging the feature.
   
   Once you let me know what you would like to do, I will create a JIRA ticket and work on that update, the javadocs and the unit tests.
   
   Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r658404524



##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -62,6 +62,7 @@
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")

Review comment:
       I have created the JIRA to track this https://issues.apache.org/jira/browse/KAFKA-12994




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r661801807



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -177,7 +204,9 @@ public long size() {
      * @param afterWindowEnd The grace period to admit out-of-order events to a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} instead
      */
+    @Deprecated
     public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {

Review comment:
       @mjsax @ableegoldman @cadonna @showuon 
   
   These are the JIRA tickets to track the work to fix the unit tests and javadocs
   
   Improve Javadocs for API Changes from KIP-633
   https://issues.apache.org/jira/browse/KAFKA-13021
   
   Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
   https://issues.apache.org/jira/browse/KAFKA-12994




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r659484507



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end will be dropped.

Review comment:
       I expected that the javadoc for `ofTimeDifferenceWithNoGrace` should be mostly the same except `grace` parameter. Is there any reason why they are different?
   
   Same comment to other places.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -83,6 +83,14 @@ protected JoinWindows(final JoinWindows joinWindows) {
         afterMs = joinWindows.afterMs;
         graceMs = joinWindows.graceMs;
         enableSpuriousResultFix = joinWindows.enableSpuriousResultFix;
+
+        if (beforeMs + afterMs < 0) {
+            throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");

Review comment:
       Could we make this Constructor to call another overloaded constructor, to avoid duplicated codes? i.e.
   ```java
   protected JoinWindows(final JoinWindows joinWindows) {
       this(joinWindows.beforeMs, joinWindows.afterMs, joinWindows.graceMs, joinWindows.enableSpuriousResultFix);
   }
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -108,13 +147,12 @@ public static SessionWindows with(final Duration inactivityGap) {
      * @param afterWindowEnd The grace period to admit out-of-order events to a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, Duration)} instead
      */
+    @Deprecated
     public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
-        if (afterWindowEndMs < 0) {
-            throw new IllegalArgumentException("Grace period must not be negative.");
-        }

Review comment:
       It's nice you move this check into the constructor. But there is also other validation you didn't move to constructor, ex: `validateMillisecondDuration(afterWindowEnd, msgPrefix)`. 
   
   We used to have 2 ways to create `SessionWindows`: `grace` and `with`. Now we added 2 more: `ofInactivityGapAndGrace`; and `ofInactivityGapAndNoGrace`. We should also validate the parameters, either in constructor, or in each method.
   
   Same comment to other places.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -89,18 +132,16 @@ private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
      * @param grace the grace period to admit out-of-order events to a window
      * @return a new window definition
      * @throws IllegalArgumentException if the specified window size is &lt; 0 or grace &lt; 0, or either can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} or {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead
      */
+    @Deprecated
     public static SlidingWindows withTimeDifferenceAndGrace(final Duration timeDifference, final Duration grace) throws IllegalArgumentException {
         final String msgPrefixSize = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefixSize);
-        if (timeDifferenceMs < 0) {
-            throw new IllegalArgumentException("Window time difference must not be negative.");
-        }
+
         final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, "grace");
         final long graceMs = validateMillisecondDuration(grace, msgPrefixGrace);

Review comment:
       Same as above mentioned, the validation didn't get handled in new API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end will be dropped.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
+     * @return a new JoinWindows object with the window definition and no grace period. Note that this means out of order records arriving after the window end will be dropped
+     */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), 0L, true);
+        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), NO_GRACE_PERIOD, true);
     }
 
-     /**
+    /**
      * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference join window interval
+     * @param timeDifference
+     * @return a new JoinWindows object with the window definition with and grace period (uses old default of 24 hours)
      * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead

Review comment:
       Please add a period after `@deprecated since 3.0`. i.e.
   `@deprecated since 3.0. Use {@link #ofTimeDifferenceAndGrace(Duration, Duration)} instead`
   
   Same comments to below identical places.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
##########
@@ -40,7 +40,11 @@
 
     // By default grace period is 24 hours for all windows,
     // in other words we allow out-of-order data for up to a day
-    protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L;
+    // This behavior is now deprecated
+    protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L;

Review comment:
       This naming is confusing to uses. This will make user think that 24 hours grace period cannot be used anymore. No I don't think that's what we want. 24 hours is still good to use, if user believe that's what they want. That means, we don't **deprecate** the 24 hours grace period, just don't set as default value, so we should not name it as "deprecated", "old", things.
   
   Correct me if I'm wrong, @izzyacademy @ableegoldman . Thank you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10926:
URL: https://github.com/apache/kafka/pull/10926


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r675292037



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
##########
@@ -40,7 +40,11 @@
 
     // By default grace period is 24 hours for all windows,
     // in other words we allow out-of-order data for up to a day
-    protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L;
+    // This behavior is now deprecated
+    protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L;

Review comment:
       Yeah it's an internal config so I hope they wouldn't assume anything from the name and extrapolate to what they can and can't use. That said, it does appear in these classes which are public themselves, so users are still going to see it. But the important thing is that it makes sense to us, the devs, who will actually be using it -- I think `DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD` is a bit more clear, just need to sneak the word "default" in there somewhere imo




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r675285980



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -177,7 +204,9 @@ public long size() {
      * @param afterWindowEnd The grace period to admit out-of-order events to a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} instead
      */
+    @Deprecated
     public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {

Review comment:
       Personally I think it makes sense to just disallow calling `ofTimeDifferenceAndGrace(...).grace(...)` entirely, this seems like abusing the API




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r658351347



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
##########
@@ -71,6 +71,7 @@
 /**
  * Tests related to internal topics in streams
  */
+@SuppressWarnings("deprecation")

Review comment:
       Thanks @ableegoldman for the feedback. I will work on your recommendations on the Javadocs and I will create the JIRA issue to track the migration of integration and unit tests to the new APIs.
   
   As you have seen, the amount of tests are significant so that one deserves its own JIRA issue. 
   
   I really appreciate all your support and guidance throughout the implementation of this KIP.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r660952604



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##########
@@ -108,13 +147,12 @@ public static SessionWindows with(final Duration inactivityGap) {
      * @param afterWindowEnd The grace period to admit out-of-order events to a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofInactivityGapAndGrace(Duration, Duration)} instead
      */
+    @Deprecated
     public SessionWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, msgPrefix);
-        if (afterWindowEndMs < 0) {
-            throw new IllegalArgumentException("Grace period must not be negative.");
-        }

Review comment:
       Thanks for this feedback @showuon I have updated the code to share/reuse constructors. I think @ableegoldman will like that as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r658403881



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -78,6 +82,43 @@
     private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
         this.timeDifferenceMs = timeDifferenceMs;
         this.graceMs = graceMs;
+
+        if (timeDifferenceMs < 0) {
+            throw new IllegalArgumentException("Window time difference must not be negative.");
+        }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Window grace period must not be negative.");
+        }
+    }
+
+    /**
+     * Return a window definition with the window size

Review comment:
       This has been addressed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r675296678



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -177,7 +204,9 @@ public long size() {
      * @param afterWindowEnd The grace period to admit out-of-order events to a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} instead
      */
+    @Deprecated
     public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {

Review comment:
       I picked up all the non-testing followup work in this PR so we could try to get it into 3.0: https://github.com/apache/kafka/pull/11114




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r662830942



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -81,18 +90,63 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * <p>
      * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
+     * Using the method implicitly sets the grace period to zero which means
+     * that out of order records arriving after the window end will be dropped
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period. Note that this means out of order records arriving after the window end will be dropped
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {
+        return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code advance == size}.
+     * Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which means
+     * that out of order records arriving after the window end will be dropped.
+     *
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param size The size of the window. Must be larger than zero
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window. Must be non-negative.
+     * @return a TimeWindows object with the specified size and the specified grace period
+     * @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeAndGrace(final Duration size, final Duration afterWindowEnd)
+            throws IllegalArgumentException {
+
+        final long sizeMs = size.toMillis();
+        final long afterWindowEndMs = afterWindowEnd.toMillis();
+
+        return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code advance == size}.
+     *
+     * @param size The size of the window
+     * @return a new window definition without specifying the grace period (uses old default of 24 hours)
      * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofSizeWithNoGrace(Duration)} } instead
      */
+    @Deprecated
     public static TimeWindows of(final Duration size) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
         final long sizeMs = validateMillisecondDuration(size, msgPrefix);
-        if (sizeMs <= 0) {
-            throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
-        }
-        return new TimeWindows(sizeMs, sizeMs, DEFAULT_GRACE_PERIOD_MS);
+
+        return new TimeWindows(sizeMs, sizeMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD);

Review comment:
       This is not completely compatible with the behavior of older Streams apps. See #10953 for more details.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r659384531



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -83,16 +92,52 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {

Review comment:
       @ableegoldman 
   
   Makes sense. I will review the Javadocs again for all the API changes and apply this feedback to the new API methods for the KIP
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r658404143



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -83,16 +92,52 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {
+        return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Reject out-of-order events that arrive more than {@code millisAfterWindowEnd} after the end of its window.

Review comment:
       I have addressed this in the latest commits.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r660953456



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,61 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @return A new JoinWindows object with the specified window definition and grace period
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream. Using the method implicitly sets the grace period to zero
+     * which means that out of order records arriving after the window end will be dropped.

Review comment:
       Upon review, I have updated the javadocs but I dont think they can be identical in wording.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r661801807



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -177,7 +204,9 @@ public long size() {
      * @param afterWindowEnd The grace period to admit out-of-order events to a window.
      * @return this updated builder
      * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofTimeDifferenceWithNoGrace(Duration)} instead
      */
+    @Deprecated
     public JoinWindows grace(final Duration afterWindowEnd) throws IllegalArgumentException {

Review comment:
       These are the JIRA tickets to track the work to fix the unit tests and javadocs
   
   Improve Javadocs for API Changes from KIP-633
   https://issues.apache.org/jira/browse/KAFKA-13021
   
   Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
   https://issues.apache.org/jira/browse/KAFKA-12994




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r658345192



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -83,16 +92,52 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {
+        return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Reject out-of-order events that arrive more than {@code millisAfterWindowEnd} after the end of its window.

Review comment:
       The javadocs here should at least contain everything in the `ofSizeWithNoGrace` javadocs, but maybe with a bit more on the grace period. Both of them are creating a new tumbling window definition

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
##########
@@ -71,6 +71,7 @@
 /**
  * Tests related to internal topics in streams
  */
+@SuppressWarnings("deprecation")

Review comment:
       Ok, I see now that there are a lot of tests using the deprecated methods. We should absolutely migrate them all to the new APIs (of course we will be forced to once the deprecated ones are removed), but it's ok with me, and maybe even slightly preferable, to do this in a followup PR. That way we can keep this PR focused on the changes themselves and just the tests for the new APIs.
   
   Can you file a ticket to migrate all of the remaining tests over to the new APIs and remove the warning suppression? Then you can start working on that after this PR is merged, or in parallel with the review but on a separate branch

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,58 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is negative of can't be represented as {@code long milliseconds}
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}
+     */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), 0L, true);
+        return new JoinWindows(timeDifference.toMillis(), timeDifference.toMillis(), NO_GRACE_PERIOD, true);
     }
 
-     /**
+    /**
      * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference join window interval
+     * @param timeDifference
+     * @return

Review comment:
       nit: either remove this line or fill it out, eg `@return a new JoinWindows specification with a 24hour grace period`. Probably good to make sure this javadoc is consistent across all the builder methods in this class, ie either all methods have the `@return` specified, or none of them do. Same for any of the other windows classes

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -341,6 +341,7 @@ private void testReprocessingFromScratchAfterResetWithIntermediateUserTopic(fina
         }
     }
 
+    @SuppressWarnings("deprecation")

Review comment:
       Same here, and for all the tests really -- any test should be migrated to use the non-deprecated version of the API, unless it's explicitly testing the behavior of the old, now-deprecated, method

##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -62,6 +62,7 @@
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")

Review comment:
       We should migrate this to use the new non-deprecated methods instead of just suppressing the warning

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -83,16 +92,52 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {

Review comment:
       We should call out explicitly that this is setting the grace period to 0, which means that out of order records arriving after the window end will be dropped. Otherwise it's too easy to just use this method without thinking any further about the grace period and what it means/whether you want it

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -78,6 +82,43 @@
     private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
         this.timeDifferenceMs = timeDifferenceMs;
         this.graceMs = graceMs;
+
+        if (timeDifferenceMs < 0) {
+            throw new IllegalArgumentException("Window time difference must not be negative.");
+        }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Window grace period must not be negative.");
+        }
+    }
+
+    /**
+     * Return a window definition with the window size

Review comment:
       ```suggestion
        * Return a window definition with the window size and no grace period. Note that this means out of order records arriving after the window end will be dropped.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r659377314



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -83,16 +92,52 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {

Review comment:
       Call it out explicitly in the javadocs, I mean. ie in the description part, not just the `@return` (though it's good to have put it there too). It would be a good idea to describe what the grace period actually means in the javadocs of this method like we do in `ofSizeAndGrace` so that users know what it is they are losing by opting out of the grace period.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r660948497



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
##########
@@ -40,7 +40,11 @@
 
     // By default grace period is 24 hours for all windows,
     // in other words we allow out-of-order data for up to a day
-    protected static final long DEFAULT_GRACE_PERIOD_MS = 24 * 60 * 60 * 1000L;
+    // This behavior is now deprecated
+    protected static final long DEPRECATED_OLD_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L;

Review comment:
       This is not a publicly available constant. It is only used internally by the implementers so I think it is OK. The reasoning for deprecating the constant is explained thoroughly in the KIP so I think they will understand. I can add more details to the Java comments to clarify. But thanks @showuon  for bring it up in the review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r660953683



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -83,6 +83,14 @@ protected JoinWindows(final JoinWindows joinWindows) {
         afterMs = joinWindows.afterMs;
         graceMs = joinWindows.graceMs;
         enableSpuriousResultFix = joinWindows.enableSpuriousResultFix;
+
+        if (beforeMs + afterMs < 0) {
+            throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");

Review comment:
       > Could we make this Constructor to call another overloaded constructor, to avoid duplicated codes? i.e.
   > 
   > ```java
   > protected JoinWindows(final JoinWindows joinWindows) {
   >     this(joinWindows.beforeMs, joinWindows.afterMs, joinWindows.graceMs, joinWindows.enableSpuriousResultFix);
   > }
   > ```
   
   This is done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#issuecomment-870911243


   @showuon @ableegoldman @mjsax @cadonna 
   
   I have updated the PR with the requested changes. Please take a look when you have a moment. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r662830942



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -81,18 +90,63 @@ private TimeWindows(final long sizeMs, final long advanceMs, final long graceMs)
      * <p>
      * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
      * Tumbling windows are a special case of hopping windows with {@code advance == size}.
+     * Using the method implicitly sets the grace period to zero which means
+     * that out of order records arriving after the window end will be dropped
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period. Note that this means out of order records arriving after the window end will be dropped
+     * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws IllegalArgumentException {
+        return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code advance == size}.
+     * Using the method explicitly sets the grace period to the duration specified by {@code afterWindowEnd} which means
+     * that out of order records arriving after the window end will be dropped.
+     *
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param size The size of the window. Must be larger than zero
+     * @param afterWindowEnd The grace period to admit out-of-order events to a window. Must be non-negative.
+     * @return a TimeWindows object with the specified size and the specified grace period
+     * @throws IllegalArgumentException if {@code afterWindowEnd} is negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeAndGrace(final Duration size, final Duration afterWindowEnd)
+            throws IllegalArgumentException {
+
+        final long sizeMs = size.toMillis();
+        final long afterWindowEndMs = afterWindowEnd.toMillis();
+
+        return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
+    }
+
+    /**
+     * Return a window definition with the given window size, and with the advance interval being equal to the window
+     * size.
+     * The time interval represented by the N-th window is: {@code [N * size, N * size + size)}.
+     * <p>
+     * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
+     * Tumbling windows are a special case of hopping windows with {@code advance == size}.
+     *
+     * @param size The size of the window
+     * @return a new window definition without specifying the grace period (uses old default of 24 hours)
      * @throws IllegalArgumentException if the specified window size is zero or negative or can't be represented as {@code long milliseconds}
+     * @deprecated since 3.0 Use {@link #ofSizeWithNoGrace(Duration)} } instead
      */
+    @Deprecated
     public static TimeWindows of(final Duration size) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
         final long sizeMs = validateMillisecondDuration(size, msgPrefix);
-        if (sizeMs <= 0) {
-            throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
-        }
-        return new TimeWindows(sizeMs, sizeMs, DEFAULT_GRACE_PERIOD_MS);
+
+        return new TimeWindows(sizeMs, sizeMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD);

Review comment:
       This is not completely compatible with the behavior of older Streams apps. See #10953 for a fix and more details.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r658404890



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -341,6 +341,7 @@ private void testReprocessingFromScratchAfterResetWithIntermediateUserTopic(fina
         }
     }
 
+    @SuppressWarnings("deprecation")

Review comment:
       I have created the jira to track this separate work item.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10926: KAFKA-8613: KIP-633 New APIs for Controlling Grace Period for Windowed Operations

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r661479999



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +89,62 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} earlier or later than

Review comment:
       Thanks @mjsax for the feedback. I will open a JIRA item to address the fix for the Javadocs.
   
   @ableegoldman @cadonna @showuon please can we approve it today so that we don't miss the cutoff?
   
   I will work on completing any javadocs and unit tests corrections/edits/additions by the end of next week.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org