You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/02/19 02:20:21 UTC
[kafka] branch trunk updated: KAFKA-9524: increase retention time
for window and grace periods longer than one day (#10091)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8112b5 KAFKA-9524: increase retention time for window and grace periods longer than one day (#10091)
c8112b5 is described below
commit c8112b5ecdda6b62d34ad97fcebbf5c7fec3de53
Author: Marco Aurelio Lotz <lo...@gmail.com>
AuthorDate: Fri Feb 19 03:18:53 2021 +0100
KAFKA-9524: increase retention time for window and grace periods longer than one day (#10091)
Reviewers: Victoria Xia <vi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../java/org/apache/kafka/streams/kstream/TimeWindows.java | 11 ++++++++---
.../org/apache/kafka/streams/kstream/TimeWindowsTest.java | 11 ++++++++++-
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index cd52dd5..9fd963a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -57,6 +57,8 @@ import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAU
*/
public final class TimeWindows extends Windows<TimeWindow> {
+ private static final long EMPTY_GRACE_PERIOD = -1;
+
private final long maintainDurationMs;
/** The size of the windows in milliseconds. */
@@ -111,7 +113,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
}
// This is a static factory method, so we initialize grace and retention to the defaults.
- return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
+ return new TimeWindows(sizeMs, sizeMs, EMPTY_GRACE_PERIOD, DEFAULT_RETENTION_MS);
}
/**
@@ -214,7 +216,10 @@ public final class TimeWindows extends Windows<TimeWindow> {
// NOTE: in the future, when we remove maintainMs,
// we should default the grace period to 24h to maintain the default behavior,
// or we can default to (24h - size) if you want to be super accurate.
- return graceMs != -1 ? graceMs : maintainMs() - size();
+ if (graceMs != EMPTY_GRACE_PERIOD) {
+ return graceMs;
+ }
+ return Math.max(maintainDurationMs - sizeMs, 0);
}
/**
@@ -245,7 +250,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
@Override
@Deprecated
public long maintainMs() {
- return Math.max(maintainDurationMs, sizeMs);
+ return Math.max(maintainDurationMs, sizeMs + gracePeriodMs());
}
@SuppressWarnings("deprecation") // removing segments from Windows will fix this
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 69b73c8..00e2b4c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -19,8 +19,10 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.Test;
+import java.time.Duration;
import java.util.Map;
+import static java.time.Duration.ofDays;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
@@ -53,12 +55,19 @@ public class TimeWindowsTest {
@SuppressWarnings("deprecation") // specifically testing deprecated APIs
@Test
- public void shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() {
+ public void shouldUseWindowSizeAsRetentionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() {
final long windowSize = 2 * TimeWindows.of(ofMillis(1)).maintainMs();
assertEquals(windowSize, TimeWindows.of(ofMillis(windowSize)).maintainMs());
}
@Test
+ public void shouldUseWindowSizeAndGraceAsRetentionTimeIfBothCombinedAreLargerThanDefaultRetentionTime() {
+ final Duration windowsSize = ofDays(1).minus(ofMillis(1));
+ final Duration gracePeriod = ofMillis(2);
+ assertEquals(windowsSize.toMillis() + gracePeriod.toMillis(), TimeWindows.of(windowsSize).grace(gracePeriod).maintainMs());
+ }
+
+ @Test
public void windowSizeMustNotBeZero() {
assertThrows(IllegalArgumentException.class, () -> TimeWindows.of(ofMillis(0)));
}