You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/01/30 08:12:04 UTC
[flink] branch master updated: [FLINK-11326] Allow negative offsets
in window assigners
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 53044a0 [FLINK-11326] Allow negative offsets in window assigners
53044a0 is described below
commit 53044a08236a6b3a34199f5bd65187fad9014e20
Author: Kezhu Wang <ke...@gmail.com>
AuthorDate: Tue Jan 22 09:04:58 2019 +0800
[FLINK-11326] Allow negative offsets in window assigners
---
.../assigners/SlidingEventTimeWindows.java | 8 ++--
.../assigners/SlidingProcessingTimeWindows.java | 8 ++--
.../assigners/TumblingEventTimeWindows.java | 4 +-
.../assigners/TumblingProcessingTimeWindows.java | 4 +-
.../windowing/SlidingEventTimeWindowsTest.java | 52 ++++++++++++++++++--
.../SlidingProcessingTimeWindowsTest.java | 55 ++++++++++++++++++++--
.../windowing/TumblingEventTimeWindowsTest.java | 20 ++++++--
.../TumblingProcessingTimeWindowsTest.java | 25 ++++++++--
8 files changed, 148 insertions(+), 28 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index b574c17..5944181 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -54,8 +54,9 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
private final long offset;
protected SlidingEventTimeWindows(long size, long slide, long offset) {
- if (offset < 0 || offset >= slide || size <= 0) {
- throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
+ if (Math.abs(offset) >= slide || size <= 0) {
+ throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy " +
+ "abs(offset) < slide and size > 0");
}
this.size = size;
@@ -130,8 +131,7 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
* @return The time policy.
*/
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
- return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
- offset.toMilliseconds() % slide.toMilliseconds());
+ return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
}
@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index 78cc8b2..3aeb258 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -52,8 +52,9 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
private final long slide;
private SlidingProcessingTimeWindows(long size, long slide, long offset) {
- if (offset < 0 || offset >= slide || size <= 0) {
- throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
+ if (Math.abs(offset) >= slide || size <= 0) {
+ throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy " +
+ "abs(offset) < slide and size > 0");
}
this.size = size;
@@ -123,8 +124,7 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
* @return The time policy.
*/
public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
- return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
- offset.toMilliseconds() % slide.toMilliseconds());
+ return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
}
@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 88710c2..30a49de 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -51,8 +51,8 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
private final long offset;
protected TumblingEventTimeWindows(long size, long offset) {
- if (offset < 0 || offset >= size) {
- throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
+ if (Math.abs(offset) >= size) {
+ throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
}
this.size = size;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 6a401ef..11f6caa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -49,8 +49,8 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
private final long offset;
private TumblingProcessingTimeWindows(long size, long offset) {
- if (offset < 0 || offset >= size) {
- throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy 0 <= offset < size");
+ if (Math.abs(offset) >= size) {
+ throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
}
this.size = size;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
index 95a8314..6cf63af 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingEventTimeWindowsTest.java
@@ -105,6 +105,36 @@ public class SlidingEventTimeWindowsTest extends TestLogger {
}
@Test
+ public void testWindowAssignmentWithNegativeOffset() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ SlidingEventTimeWindows assigner =
+ SlidingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(-100));
+
+ assertThat(assigner.assignWindows("String", 0L, mockContext), containsInAnyOrder(
+ timeWindow(-4100, 900),
+ timeWindow(-3100, 1900),
+ timeWindow(-2100, 2900),
+ timeWindow(-1100, 3900),
+ timeWindow(-100, 4900)));
+
+ assertThat(assigner.assignWindows("String", 4899L, mockContext), containsInAnyOrder(
+ timeWindow(-100, 4900),
+ timeWindow(900, 5900),
+ timeWindow(1900, 6900),
+ timeWindow(2900, 7900),
+ timeWindow(3900, 8900)));
+
+ assertThat(assigner.assignWindows("String", 4900L, mockContext), containsInAnyOrder(
+ timeWindow(900, 5900),
+ timeWindow(1900, 6900),
+ timeWindow(2900, 7900),
+ timeWindow(3900, 8900),
+ timeWindow(4900, 9900)));
+ }
+
+ @Test
public void testTimeUnits() {
// sanity check with one other time unit
@@ -141,21 +171,35 @@ public class SlidingEventTimeWindowsTest extends TestLogger {
SlidingEventTimeWindows.of(Time.seconds(-2), Time.seconds(1));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}
try {
SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(-1));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
+ }
+
+ try {
+ SlidingEventTimeWindows.of(Time.seconds(-20), Time.seconds(10), Time.seconds(-1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
+ }
+
+ try {
+ SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-11));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}
try {
- SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
+ SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(11));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
index 69b628a..5567a17 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/SlidingProcessingTimeWindowsTest.java
@@ -112,6 +112,39 @@ public class SlidingProcessingTimeWindowsTest extends TestLogger {
}
@Test
+ public void testWindowAssignmentWithNegativeOffset() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ SlidingProcessingTimeWindows assigner =
+ SlidingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(1000), Time.milliseconds(-100));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(-4100, 900),
+ timeWindow(-3100, 1900),
+ timeWindow(-2100, 2900),
+ timeWindow(-1100, 3900),
+ timeWindow(-100, 4900)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(-100, 4900),
+ timeWindow(900, 5900),
+ timeWindow(1900, 6900),
+ timeWindow(2900, 7900),
+ timeWindow(3900, 8900)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), containsInAnyOrder(
+ timeWindow(900, 5900),
+ timeWindow(1900, 6900),
+ timeWindow(2900, 7900),
+ timeWindow(3900, 8900),
+ timeWindow(4900, 9900)));
+ }
+
+ @Test
public void testTimeUnits() {
// sanity check with one other time unit
@@ -151,21 +184,35 @@ public class SlidingProcessingTimeWindowsTest extends TestLogger {
SlidingProcessingTimeWindows.of(Time.seconds(-2), Time.seconds(1));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}
try {
SlidingProcessingTimeWindows.of(Time.seconds(2), Time.seconds(-1));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
+ }
+
+ try {
+ SlidingProcessingTimeWindows.of(Time.seconds(-20), Time.seconds(10), Time.seconds(-1));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
+ }
+
+ try {
+ SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-11));
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}
try {
- SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(-1));
+ SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10), Time.seconds(11));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < slide and size > 0"));
+ assertThat(e.toString(), containsString("abs(offset) < slide and size > 0"));
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
index 9e4669f..0451c15 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingEventTimeWindowsTest.java
@@ -69,6 +69,18 @@ public class TumblingEventTimeWindowsTest extends TestLogger {
}
@Test
+ public void testWindowAssignmentWithNegativeOffset() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ TumblingEventTimeWindows assigner = TumblingEventTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));
+
+ assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(-100, 4900)));
+ assertThat(assigner.assignWindows("String", 4899L, mockContext), contains(timeWindow(-100, 4900)));
+ assertThat(assigner.assignWindows("String", 4900L, mockContext), contains(timeWindow(4900, 9900)));
+ }
+
+ @Test
public void testTimeUnits() {
// sanity check with one other time unit
@@ -88,21 +100,21 @@ public class TumblingEventTimeWindowsTest extends TestLogger {
TumblingEventTimeWindows.of(Time.seconds(-1));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < size"));
+ assertThat(e.toString(), containsString("abs(offset) < size"));
}
try {
TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(20));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < size"));
+ assertThat(e.toString(), containsString("abs(offset) < size"));
}
try {
- TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(-1));
+ TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(-11));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < size"));
+ assertThat(e.toString(), containsString("abs(offset) < size"));
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
index a611fc0..1306252 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TumblingProcessingTimeWindowsTest.java
@@ -80,6 +80,23 @@ public class TumblingProcessingTimeWindowsTest extends TestLogger {
}
@Test
+ public void testWindowAssignmentWithNegativeOffset() {
+ WindowAssigner.WindowAssignerContext mockContext =
+ mock(WindowAssigner.WindowAssignerContext.class);
+
+ TumblingProcessingTimeWindows assigner = TumblingProcessingTimeWindows.of(Time.milliseconds(5000), Time.milliseconds(-100));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(100L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(4899L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(-100, 4900)));
+
+ when(mockContext.getCurrentProcessingTime()).thenReturn(4900L);
+ assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4900, 9900)));
+ }
+
+ @Test
public void testTimeUnits() {
// sanity check with one other time unit
@@ -104,21 +121,21 @@ public class TumblingProcessingTimeWindowsTest extends TestLogger {
TumblingProcessingTimeWindows.of(Time.seconds(-1));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < size"));
+ assertThat(e.toString(), containsString("abs(offset) < size"));
}
try {
TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(20));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < size"));
+ assertThat(e.toString(), containsString("abs(offset) < size"));
}
try {
- TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(-1));
+ TumblingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(-11));
fail("should fail");
} catch (IllegalArgumentException e) {
- assertThat(e.toString(), containsString("0 <= offset < size"));
+ assertThat(e.toString(), containsString("abs(offset) < size"));
}
}