You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/21 08:51:14 UTC
[flink] branch master updated: [FLINK-26334][datastream] Modified getWindowStartWithOffset method in org.apache.flink.table.runtime.operators.window.TimeWindow and org.apache.flink.streaming.api.windowing.windows.TimeWindow . Added test cases in unit test TimeWindowTest.
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c1bbbe6 [FLINK-26334][datastream] Modified getWindowStartWithOffset method in org.apache.flink.table.runtime.operators.window.TimeWindow and org.apache.flink.streaming.api.windowing.windows.TimeWindow . Added test cases in unit test TimeWindowTest.
c1bbbe6 is described below
commit c1bbbe6f0d148d109f903c5d3b207e9923a74e23
Author: realdengziqi <99...@qq.com>
AuthorDate: Sat Mar 5 02:01:22 2022 +0800
[FLINK-26334][datastream] Modified getWindowStartWithOffset method in org.apache.flink.table.runtime.operators.window.TimeWindow and org.apache.flink.streaming.api.windowing.windows.TimeWindow . Added test cases in unit test TimeWindowTest.
Co-authored-by: Lin WanNi <li...@foxmail.com>
Co-authored-by: Guo YuanFang <16...@qq.com>
---
.../streaming/api/windowing/windows/TimeWindow.java | 8 +++++++-
.../runtime/operators/windowing/TimeWindowTest.java | 21 ++++++++++++++++++---
.../table/runtime/operators/window/TimeWindow.java | 8 +++++++-
3 files changed, 32 insertions(+), 5 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 3346b39..93c1bf1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -262,6 +262,12 @@ public class TimeWindow extends Window {
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
- return timestamp - (timestamp - offset + windowSize) % windowSize;
+ final long remainder = (timestamp - offset) % windowSize;
+ // handle both positive and negative cases
+ if (remainder < 0) {
+ return timestamp - (remainder + windowSize);
+ } else {
+ return timestamp - remainder;
+ }
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
index 57a4367..0be5ed7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
@@ -29,23 +29,38 @@ import java.util.concurrent.TimeUnit;
public class TimeWindowTest {
@Test
public void testGetWindowStartWithOffset() {
- // [0, 7), [7, 14), [14, 21)...
+ // [-21, -14), [-14, -7), [-7, 0), [0, 7), [7, 14), [14, 21)...
long offset = 0;
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-8, offset, 7), -14);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-7, offset, 7), -7);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-6, offset, 7), -7);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-1, offset, 7), -7);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), 0);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6, offset, 7), 0);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7, offset, 7), 7);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8, offset, 7), 7);
- // [-4, 3), [3, 10), [10, 17)...
+ // [-11, -4), [-4, 3), [3, 10), [10, 17)...
offset = 3;
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-10, offset, 7), -11);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-9, offset, 7), -11);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-3, offset, 7), -4);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2, offset, 7), -4);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-1, offset, 7), -4);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -4);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2, offset, 7), -4);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), 3);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9, offset, 7), 3);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10, offset, 7), 10);
- // [-2, 5), [5, 12), [12, 19)...
+ // [-16, -9), [-9, -2), [-2, 5), [5, 12), [12, 19)...
offset = -2;
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-12, offset, 7), -16);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-7, offset, 7), -9);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-4, offset, 7), -9);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-3, offset, 7), -9);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2, offset, 7), -2);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-1, offset, 7), -2);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -2);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2, offset, 7), -2);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), -2);
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java
index e358c99..11e2ed3 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java
@@ -220,7 +220,13 @@ public class TimeWindow extends Window {
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
- return timestamp - (timestamp - offset + windowSize) % windowSize;
+ final long remainder = (timestamp - offset) % windowSize;
+ // handle both positive and negative cases
+ if (remainder < 0) {
+ return timestamp - (remainder + windowSize);
+ } else {
+ return timestamp - remainder;
+ }
}
public static TimeWindow of(long start, long end) {