You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/07/28 13:51:03 UTC
[flink] branch master updated: [FLINK-18281] Add window stagger to
TumblingEventTimeWindow
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 335c47e [FLINK-18281] Add window stagger to TumblingEventTimeWindow
335c47e is described below
commit 335c47e11478358e8514e63ca807ea765ed9dd9a
Author: Niel Hu <ni...@uber.com>
AuthorDate: Fri Jun 12 11:55:05 2020 -0700
[FLINK-18281] Add window stagger to TumblingEventTimeWindow
---
.../assigners/TumblingEventTimeWindows.java | 36 +++++++++++++++++-----
.../windowing/assigners/TumblingTimeWindows.java | 2 +-
.../windowing/TumblingEventTimeWindowsTest.java | 19 ++++++++++--
3 files changed, 47 insertions(+), 10 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 30a49de..e1aabcd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -48,22 +48,30 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
private final long size;
- private final long offset;
+ private final long globalOffset;
- protected TumblingEventTimeWindows(long size, long offset) {
+ private Long staggerOffset = null;
+
+ private final WindowStagger windowStagger;
+
+ protected TumblingEventTimeWindows(long size, long offset, WindowStagger windowStagger) {
if (Math.abs(offset) >= size) {
throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
}
this.size = size;
- this.offset = offset;
+ this.globalOffset = offset;
+ this.windowStagger = windowStagger;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
+ if (staggerOffset == null) {
+ staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
+ }
// Long.MIN_VALUE is currently assigned when no timestamp is present
- long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
+ long start = TimeWindow.getWindowStartWithOffset(timestamp, (globalOffset + staggerOffset) % size, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
@@ -90,7 +98,7 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
* @return The time policy.
*/
public static TumblingEventTimeWindows of(Time size) {
- return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
+ return new TumblingEventTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED);
}
/**
@@ -108,10 +116,24 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
*
* @param size The size of the generated windows.
* @param offset The offset which window start would be shifted by.
- * @return The time policy.
*/
public static TumblingEventTimeWindows of(Time size, Time offset) {
- return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
+ return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
+ }
+
+
+ /**
+ * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
+ * elements to time windows based on the element timestamp, offset and a staggering offset,
+ * depending on the staggering policy.
+ *
+ * @param size The size of the generated windows.
+ * @param offset The globalOffset which window start would be shifted by.
+ * @param windowStagger The utility that produces staggering offset in runtime.
+ */
+ @PublicEvolving
+ public static TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger) {
+ return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), windowStagger);
}
@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index 589bce3..1ac4241 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -33,7 +33,7 @@ public class TumblingTimeWindows extends TumblingEventTimeWindows {
private static final long serialVersionUID = 1L;
private TumblingTimeWindows(long size) {
- super(size, 0);
+ super(size, 0, WindowStagger.ALIGNED);
}
/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
index d86e575..ba5ff2f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowStagger;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -38,6 +39,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Tests for {@link TumblingEventTimeWindows}.
@@ -57,7 +59,20 @@ public class TumblingEventTimeWindowsTest extends TestLogger {
}
@Test
- public void testWindowAssignmentWithOffset() {
+ public void testWindowAssignmentWithStagger() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(0), WindowStagger.NATURAL);
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(150L);
+ assertThat(assigner.assignWindows("String", 150L, mockContext), contains(timeWindow(150, 5150)));
+ assertThat(assigner.assignWindows("String", 5099L, mockContext), contains(timeWindow(150, 5150)));
+ assertThat(assigner.assignWindows("String", 5300L, mockContext), contains(timeWindow(5150, 10150)));
+ }
+
+ @Test
+ public void testWindowAssignmentWithGlobalOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);
@@ -69,7 +84,7 @@ public class TumblingEventTimeWindowsTest extends TestLogger {
}
@Test
- public void testWindowAssignmentWithNegativeOffset() {
+ public void testWindowAssignmentWithNegativeGlobalOffset() {
WindowAssigner.WindowAssignerContext mockContext =
mock(WindowAssigner.WindowAssignerContext.class);