You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/07/28 13:51:03 UTC

[flink] branch master updated: [FLINK-18281] Add window stagger to TumblingEventTimeWindow

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 335c47e  [FLINK-18281] Add window stagger to TumblingEventTimeWindow
335c47e is described below

commit 335c47e11478358e8514e63ca807ea765ed9dd9a
Author: Niel Hu <ni...@uber.com>
AuthorDate: Fri Jun 12 11:55:05 2020 -0700

    [FLINK-18281] Add window stagger to TumblingEventTimeWindow
---
 .../assigners/TumblingEventTimeWindows.java        | 36 +++++++++++++++++-----
 .../windowing/assigners/TumblingTimeWindows.java   |  2 +-
 .../windowing/TumblingEventTimeWindowsTest.java    | 19 ++++++++++--
 3 files changed, 47 insertions(+), 10 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 30a49de..e1aabcd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -48,22 +48,30 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 
 	private final long size;
 
-	private final long offset;
+	private final long globalOffset;
 
-	protected TumblingEventTimeWindows(long size, long offset) {
+	private Long staggerOffset = null;
+
+	private final WindowStagger windowStagger;
+
+	protected TumblingEventTimeWindows(long size, long offset, WindowStagger windowStagger) {
 		if (Math.abs(offset) >= size) {
 			throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
 		}
 
 		this.size = size;
-		this.offset = offset;
+		this.globalOffset = offset;
+		this.windowStagger = windowStagger;
 	}
 
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		if (timestamp > Long.MIN_VALUE) {
+			if (staggerOffset == null) {
+				staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
+			}
 			// Long.MIN_VALUE is currently assigned when no timestamp is present
-			long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
+			long start = TimeWindow.getWindowStartWithOffset(timestamp, (globalOffset + staggerOffset) % size, size);
 			return Collections.singletonList(new TimeWindow(start, start + size));
 		} else {
 			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
@@ -90,7 +98,7 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	 * @return The time policy.
 	 */
 	public static TumblingEventTimeWindows of(Time size) {
-		return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
+		return new TumblingEventTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED);
 	}
 
 	/**
@@ -108,10 +116,24 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	 *
 	 * @param size The size of the generated windows.
 	 * @param offset The offset which window start would be shifted by.
-	 * @return The time policy.
 	 */
 	public static TumblingEventTimeWindows of(Time size, Time offset) {
-		return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
+		return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
+	}
+
+
+	/**
+	 * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
+	 * elements to time windows based on the element timestamp, offset and a staggering offset,
+	 * depending on the staggering policy.
+	 *
+	 * @param size The size of the generated windows.
+	 * @param offset The globalOffset which window start would be shifted by.
+	 * @param windowStagger The utility that produces staggering offset in runtime.
+	 */
+	@PublicEvolving
+	public static TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger) {
+		return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), windowStagger);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index 589bce3..1ac4241 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -33,7 +33,7 @@ public class TumblingTimeWindows extends TumblingEventTimeWindows {
 	private static final long serialVersionUID = 1L;
 
 	private TumblingTimeWindows(long size) {
-		super(size, 0);
+		super(size, 0, WindowStagger.ALIGNED);
 	}
 
 	/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
index d86e575..ba5ff2f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowStagger;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -38,6 +39,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link TumblingEventTimeWindows}.
@@ -57,7 +59,20 @@ public class TumblingEventTimeWindowsTest extends TestLogger {
 	}
 
 	@Test
-	public void testWindowAssignmentWithOffset() {
+	public void testWindowAssignmentWithStagger() {
+		WindowAssigner.WindowAssignerContext mockContext =
+			mock(WindowAssigner.WindowAssignerContext.class);
+
+		TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(0), WindowStagger.NATURAL);
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(150L);
+		assertThat(assigner.assignWindows("String", 150L, mockContext), contains(timeWindow(150, 5150)));
+		assertThat(assigner.assignWindows("String", 5099L, mockContext), contains(timeWindow(150, 5150)));
+		assertThat(assigner.assignWindows("String", 5300L, mockContext), contains(timeWindow(5150, 10150)));
+	}
+
+	@Test
+	public void testWindowAssignmentWithGlobalOffset() {
 		WindowAssigner.WindowAssignerContext mockContext =
 				mock(WindowAssigner.WindowAssignerContext.class);
 
@@ -69,7 +84,7 @@ public class TumblingEventTimeWindowsTest extends TestLogger {
 	}
 
 	@Test
-	public void testWindowAssignmentWithNegativeOffset() {
+	public void testWindowAssignmentWithNegativeGlobalOffset() {
 		WindowAssigner.WindowAssignerContext mockContext =
 			mock(WindowAssigner.WindowAssignerContext.class);