You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/01/30 08:12:04 UTC

[flink] branch master updated: [FLINK-11326] Allow negative offsets in window assigners

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 53044a0  [FLINK-11326] Allow negative offsets in window assigners
53044a0 is described below

commit 53044a08236a6b3a34199f5bd65187fad9014e20
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Tue Jan 22 09:04:58 2019 +0800

    [FLINK-11326] Allow negative offsets in window assigners
---
 .../assigners/SlidingEventTimeWindows.java         |  8 ++--
 .../assigners/SlidingProcessingTimeWindows.java    |  8 ++--
 .../assigners/TumblingEventTimeWindows.java        |  4 +-
 .../assigners/TumblingProcessingTimeWindows.java   |  4 +-
 .../windowing/SlidingEventTimeWindowsTest.java     | 52 ++++++++++++++++++--
 .../SlidingProcessingTimeWindowsTest.java          | 55 ++++++++++++++++++++--
 .../windowing/TumblingEventTimeWindowsTest.java    | 20 ++++++--
 .../TumblingProcessingTimeWindowsTest.java         | 25 ++++++++--
 8 files changed, 148 insertions(+), 28 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index b574c17..5944181 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -54,8 +54,9 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	private final long offset;
 
 	protected SlidingEventTimeWindows(long size, long slide, long offset) {
-		if (offset < 0 || offset >= slide || size <= 0) {
-			throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
+		if (Math.abs(offset) >= slide || size <= 0) {
+			throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy " +
+				"abs(offset) < slide and size > 0");
 		}
 
 		this.size = size;
@@ -130,8 +131,7 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	 * @return The time policy.
 	 */
 	public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
-		return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
-			offset.toMilliseconds() % slide.toMilliseconds());
+		return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index 78cc8b2..3aeb258 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -52,8 +52,9 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
 	private final long slide;
 
 	private SlidingProcessingTimeWindows(long size, long slide, long offset) {
-		if (offset < 0 || offset >= slide || size <= 0) {
-			throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
+		if (Math.abs(offset) >= slide || size <= 0) {
+			throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy " +
+				"abs(offset) < slide and size > 0");
 		}
 
 		this.size = size;
@@ -123,8 +124,7 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
 	 * @return The time policy.
 	 */
 	public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
-		return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
-			offset.toMilliseconds() % slide.toMilliseconds());
+		return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 88710c2..30a49de 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -51,8 +51,8 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	private final long offset;
 
 	protected TumblingEventTimeWindows(long size, long offset) {
-		if (offset < 0 || offset >= size) {
-			throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
+		if (Math.abs(offset) >= size) {
+			throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
 		}
 
 		this.size = size;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 6a401ef..11f6caa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -49,8 +49,8 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
 	private final long offset;
 
 	private TumblingProcessingTimeWindows(long size, long offset) {
-		if (offset < 0 || offset >= size) {
-			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy  0 <= offset < size");
+		if (Math.abs(offset) >= size) {
+			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
 		}
 
 		this.size = size;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
index 95a8314..6cf63af 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
@@ -105,6 +105,36 @@ public class SlidingEventTimeWindowsTest extends TestLogger {
 	}
 
 	@Test
+	public void testWindowAssignmentWithNegativeOffset() {
+		WindowAssigner.WindowAssignerContext mockContext =
+			mock(WindowAssigner.WindowAssignerContext.class);
+
+		SlidingEventTimeWindows assigner =
+			SlidingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(-100));
+
+		assertThat(assigner.assignWindows("String", 0L, mockContext), containsInAnyOrder(
+			timeWindow(-4100, 900),
+			timeWindow(-3100, 1900),
+			timeWindow(-2100, 2900),
+			timeWindow(-1100, 3900),
+			timeWindow(-100, 4900)));
+
+		assertThat(assigner.assignWindows("String", 4899L, mockContext), containsInAnyOrder(
+			timeWindow(-100, 4900),
+			timeWindow(900, 5900),
+			timeWindow(1900, 6900),
+			timeWindow(2900, 7900),
+			timeWindow(3900, 8900)));
+
+		assertThat(assigner.assignWindows("String", 4900L, mockContext), containsInAnyOrder(
+			timeWindow(900, 5900),
+			timeWindow(1900, 6900),
+			timeWindow(2900, 7900),
+			timeWindow(3900, 8900),
+			timeWindow(4900, 9900)));
+	}
+
+	@Test
 	public void testTimeUnits() {
 		// sanity check with one other time unit
 
@@ -141,21 +171,35 @@ public class SlidingEventTimeWindowsTest extends TestLogger {
 			SlidingEventTimeWindows.of(Time.seconds(-2), Time.seconds(1));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+			assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
 		}
 
 		try {
 			SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(-1));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+			assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
+		}
+
+		try {
+			SlidingEventTimeWindows.of(Time.seconds(-20), Time.seconds(10), Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
+		}
+
+		try {
+			SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-11));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
 		}
 
 		try {
-			SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
+			SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(11));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+			assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
index 69b628a..5567a17 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
@@ -112,6 +112,39 @@ public class SlidingProcessingTimeWindowsTest extends TestLogger {
 	}
 
 	@Test
+	public void testWindowAssignmentWithNegativeOffset() {
+		WindowAssigner.WindowAssignerContext mockContext =
+			mock(WindowAssigner.WindowAssignerContext.class);
+
+		SlidingProcessingTimeWindows assigner =
+			SlidingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(-100));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+			timeWindow(-4100, 900),
+			timeWindow(-3100, 1900),
+			timeWindow(-2100, 2900),
+			timeWindow(-1100, 3900),
+			timeWindow(-100, 4900)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+			timeWindow(-100, 4900),
+			timeWindow(900, 5900),
+			timeWindow(1900, 6900),
+			timeWindow(2900, 7900),
+			timeWindow(3900, 8900)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+			timeWindow(900, 5900),
+			timeWindow(1900, 6900),
+			timeWindow(2900, 7900),
+			timeWindow(3900, 8900),
+			timeWindow(4900, 9900)));
+	}
+
+	@Test
 	public void testTimeUnits() {
 		// sanity check with one other time unit
 
@@ -151,21 +184,35 @@ public class SlidingProcessingTimeWindowsTest extends TestLogger {
 			SlidingProcessingTimeWindows.of(Time.seconds(-2), Time.seconds(1));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+			assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
 		}
 
 		try {
 			SlidingProcessingTimeWindows.of(Time.seconds(2), Time.seconds(-1));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+			assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
+		}
+
+		try {
+			SlidingProcessingTimeWindows.of(Time.seconds(-20), Time.seconds(10), Time.seconds(-1));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
+		}
+
+		try {
+			SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-11));
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
 		}
 
 		try {
-			SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
+			SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(11));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+			assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
index 9e4669f..0451c15 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
@@ -69,6 +69,18 @@ public class TumblingEventTimeWindowsTest extends TestLogger {
 	}
 
 	@Test
+	public void testWindowAssignmentWithNegativeOffset() {
+		WindowAssigner.WindowAssignerContext mockContext =
+			mock(WindowAssigner.WindowAssignerContext.class);
+
+		TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));
+
+		assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(-100, 4900)));
+		assertThat(assigner.assignWindows("String", 4899L, mockContext), contains(timeWindow(-100, 4900)));
+		assertThat(assigner.assignWindows("String", 4900L, mockContext), contains(timeWindow(4900, 9900)));
+	}
+
+	@Test
 	public void testTimeUnits() {
 		// sanity check with one other time unit
 
@@ -88,21 +100,21 @@ public class TumblingEventTimeWindowsTest extends TestLogger {
 			TumblingEventTimeWindows.of(Time.seconds(-1));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < size"));
+			assertThat(e.toString(), containsString("abs(offset) < size"));
 		}
 
 		try {
 			TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(20));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < size"));
+			assertThat(e.toString(), containsString("abs(offset) < size"));
 		}
 
 		try {
-			TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(-1));
+			TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(-11));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < size"));
+			assertThat(e.toString(), containsString("abs(offset) < size"));
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
index a611fc0..1306252 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
@@ -80,6 +80,23 @@ public class TumblingProcessingTimeWindowsTest extends TestLogger {
 	}
 
 	@Test
+	public void testWindowAssignmentWithNegativeOffset() {
+		WindowAssigner.WindowAssignerContext mockContext =
+			mock(WindowAssigner.WindowAssignerContext.class);
+
+		TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
+		assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4900, 9900)));
+	}
+
+	@Test
 	public void testTimeUnits() {
 		// sanity check with one other time unit
 
@@ -104,21 +121,21 @@ public class TumblingProcessingTimeWindowsTest extends TestLogger {
 			TumblingProcessingTimeWindows.of(Time.seconds(-1));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < size"));
+			assertThat(e.toString(), containsString("abs(offset) < size"));
 		}
 
 		try {
 			TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(20));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < size"));
+			assertThat(e.toString(), containsString("abs(offset) < size"));
 		}
 
 		try {
-			TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(-1));
+			TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(-11));
 			fail("should fail");
 		} catch (IllegalArgumentException e) {
-			assertThat(e.toString(), containsString("0 <= offset < size"));
+			assertThat(e.toString(), containsString("abs(offset) < size"));
 		}
 	}