You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/02/19 02:20:21 UTC

[kafka] branch trunk updated: KAFKA-9524: increase retention time for window and grace periods longer than one day (#10091)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8112b5  KAFKA-9524: increase retention time for window and grace periods longer than one day (#10091)
c8112b5 is described below

commit c8112b5ecdda6b62d34ad97fcebbf5c7fec3de53
Author: Marco Aurelio Lotz <lo...@gmail.com>
AuthorDate: Fri Feb 19 03:18:53 2021 +0100

    KAFKA-9524: increase retention time for window and grace periods longer than one day (#10091)
    
    Reviewers: Victoria Xia <vi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../java/org/apache/kafka/streams/kstream/TimeWindows.java    | 11 ++++++++---
 .../org/apache/kafka/streams/kstream/TimeWindowsTest.java     | 11 ++++++++++-
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index cd52dd5..9fd963a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -57,6 +57,8 @@ import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAU
  */
 public final class TimeWindows extends Windows<TimeWindow> {
 
+    private static final long EMPTY_GRACE_PERIOD = -1;
+
     private final long maintainDurationMs;
 
     /** The size of the windows in milliseconds. */
@@ -111,7 +113,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
             throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
         }
         // This is a static factory method, so we initialize grace and retention to the defaults.
-        return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
+        return new TimeWindows(sizeMs, sizeMs, EMPTY_GRACE_PERIOD, DEFAULT_RETENTION_MS);
     }
 
     /**
@@ -214,7 +216,10 @@ public final class TimeWindows extends Windows<TimeWindow> {
         // NOTE: in the future, when we remove maintainMs,
         // we should default the grace period to 24h to maintain the default behavior,
         // or we can default to (24h - size) if you want to be super accurate.
-        return graceMs != -1 ? graceMs : maintainMs() - size();
+        if (graceMs != EMPTY_GRACE_PERIOD) {
+            return graceMs;
+        }
+        return Math.max(maintainDurationMs - sizeMs, 0);
     }
 
     /**
@@ -245,7 +250,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
     @Override
     @Deprecated
     public long maintainMs() {
-        return Math.max(maintainDurationMs, sizeMs);
+        return Math.max(maintainDurationMs, sizeMs + gracePeriodMs());
     }
 
     @SuppressWarnings("deprecation") // removing segments from Windows will fix this
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 69b73c8..00e2b4c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -19,8 +19,10 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Map;
 
+import static java.time.Duration.ofDays;
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
 import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
@@ -53,12 +55,19 @@ public class TimeWindowsTest {
 
     @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test
-    public void shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() {
+    public void shouldUseWindowSizeAsRetentionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() {
         final long windowSize = 2 * TimeWindows.of(ofMillis(1)).maintainMs();
         assertEquals(windowSize, TimeWindows.of(ofMillis(windowSize)).maintainMs());
     }
 
     @Test
+    public void shouldUseWindowSizeAndGraceAsRetentionTimeIfBothCombinedAreLargerThanDefaultRetentionTime() {
+        final Duration windowsSize = ofDays(1).minus(ofMillis(1));
+        final Duration gracePeriod = ofMillis(2);
+        assertEquals(windowsSize.toMillis() + gracePeriod.toMillis(), TimeWindows.of(windowsSize).grace(gracePeriod).maintainMs());
+    }
+
+    @Test
     public void windowSizeMustNotBeZero() {
         assertThrows(IllegalArgumentException.class, () -> TimeWindows.of(ofMillis(0)));
     }