You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/21 08:51:14 UTC

[flink] branch master updated: [FLINK-26334][datastream] Modified getWindowStartWithOffset method in org.apache.flink.table.runtime.operators.window.TimeWindow and org.apache.flink.streaming.api.windowing.windows.TimeWindow . Added test cases in unit test TimeWindowTest.

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c1bbbe6  [FLINK-26334][datastream] Modified getWindowStartWithOffset method in org.apache.flink.table.runtime.operators.window.TimeWindow and org.apache.flink.streaming.api.windowing.windows.TimeWindow . Added test cases in unit test TimeWindowTest.
c1bbbe6 is described below

commit c1bbbe6f0d148d109f903c5d3b207e9923a74e23
Author: realdengziqi <99...@qq.com>
AuthorDate: Sat Mar 5 02:01:22 2022 +0800

    [FLINK-26334][datastream] Modified getWindowStartWithOffset method in org.apache.flink.table.runtime.operators.window.TimeWindow and org.apache.flink.streaming.api.windowing.windows.TimeWindow . Added test cases in unit test TimeWindowTest.
    
    Co-authored-by: Lin WanNi <li...@foxmail.com>
    Co-authored-by: Guo YuanFang <16...@qq.com>
---
 .../streaming/api/windowing/windows/TimeWindow.java |  8 +++++++-
 .../runtime/operators/windowing/TimeWindowTest.java | 21 ++++++++++++++++++---
 .../table/runtime/operators/window/TimeWindow.java  |  8 +++++++-
 3 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 3346b39..93c1bf1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -262,6 +262,12 @@ public class TimeWindow extends Window {
      * @return window start
      */
     public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
-        return timestamp - (timestamp - offset + windowSize) % windowSize;
+        final long remainder = (timestamp - offset) % windowSize;
+        // handle both positive and negative cases
+        if (remainder < 0) {
+            return timestamp - (remainder + windowSize);
+        } else {
+            return timestamp - remainder;
+        }
     }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
index 57a4367..0be5ed7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
@@ -29,23 +29,38 @@ import java.util.concurrent.TimeUnit;
 public class TimeWindowTest {
     @Test
     public void testGetWindowStartWithOffset() {
-        // [0, 7), [7, 14), [14, 21)...
+        // [-21, -14), [-14, -7), [-7, 0), [0, 7), [7, 14), [14, 21)...
         long offset = 0;
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-8, offset, 7), -14);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-7, offset, 7), -7);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-6, offset, 7), -7);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-1, offset, 7), -7);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), 0);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6, offset, 7), 0);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7, offset, 7), 7);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8, offset, 7), 7);
 
-        // [-4, 3), [3, 10), [10, 17)...
+        // [-11, -4), [-4, 3), [3, 10), [10, 17)...
         offset = 3;
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-10, offset, 7), -11);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-9, offset, 7), -11);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-3, offset, 7), -4);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2, offset, 7), -4);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-1, offset, 7), -4);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -4);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2, offset, 7), -4);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), 3);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9, offset, 7), 3);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10, offset, 7), 10);
 
-        // [-2, 5), [5, 12), [12, 19)...
+        // [-16, -9), [-9, -2), [-2, 5), [5, 12), [12, 19)...
         offset = -2;
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-12, offset, 7), -16);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-7, offset, 7), -9);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-4, offset, 7), -9);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-3, offset, 7), -9);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2, offset, 7), -2);
+        Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-1, offset, 7), -2);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1, offset, 7), -2);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2, offset, 7), -2);
         Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3, offset, 7), -2);
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java
index e358c99..11e2ed3 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/TimeWindow.java
@@ -220,7 +220,13 @@ public class TimeWindow extends Window {
      * @return window start
      */
     public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
-        return timestamp - (timestamp - offset + windowSize) % windowSize;
+        final long remainder = (timestamp - offset) % windowSize;
+        // handle both positive and negative cases
+        if (remainder < 0) {
+            return timestamp - (remainder + windowSize);
+        } else {
+            return timestamp - remainder;
+        }
     }
 
     public static TimeWindow of(long start, long end) {