You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/19 16:24:04 UTC
[2/2] flink git commit: [FLINK-4282] Add Offset Parameter to
WindowAssigners
[FLINK-4282] Add Offset Parameter to WindowAssigners
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09774626
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09774626
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09774626
Branch: refs/heads/master
Commit: 09774626086564b184abbe09776d3b7033badd20
Parents: 3be9a28
Author: renkai <ga...@gmail.com>
Authored: Thu Aug 11 18:48:50 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Aug 19 18:23:38 2016 +0200
----------------------------------------------------------------------
.../assigners/SlidingEventTimeWindows.java | 34 +++-
.../assigners/SlidingProcessingTimeWindows.java | 34 +++-
.../windowing/assigners/SlidingTimeWindows.java | 2 +-
.../assigners/TumblingEventTimeWindows.java | 35 +++-
.../TumblingProcessingTimeWindows.java | 34 +++-
.../assigners/TumblingTimeWindows.java | 2 +-
.../api/windowing/windows/TimeWindow.java | 12 ++
.../operators/windowing/TimeWindowTest.java | 59 +++++++
.../operators/windowing/WindowOperatorTest.java | 175 +++++++++++++++++++
9 files changed, 370 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 8fd0d25..16171a0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -52,16 +52,19 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
private final long slide;
- protected SlidingEventTimeWindows(long size, long slide) {
+ private final long offset;
+
+ protected SlidingEventTimeWindows(long size, long slide, long offset) {
this.size = size;
this.slide = slide;
+ this.offset = offset;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
- long lastStart = timestamp - timestamp % slide;
+ long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
@@ -102,7 +105,32 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
* @return The time policy.
*/
public static SlidingEventTimeWindows of(Time size, Time slide) {
- return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
+ return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
+ }
+
+ /**
+ * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns
+ * elements to time windows based on the element timestamp and offset.
+ *<p>
+ * For example, if you want window a stream by hour,but window begins at the 15th minutes
+ * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+ * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+ *</p>
+ *
+ * <p>
+ * Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+ * such as China which is using UTC+08:00,and you want a time window with size of one day,
+ * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+ * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
+ * </p>
+ * @param size The size of the generated windows.
+ * @param slide The slide interval of the generated windows.
+ * @param offset The offset which window start would be shifted by.
+ * @return The time policy.
+ */
+ public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
+ return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
+ offset.toMilliseconds() % slide.toMilliseconds());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index 6a03640..e03467f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -47,18 +47,21 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
private final long size;
+ private final long offset;
+
private final long slide;
- private SlidingProcessingTimeWindows(long size, long slide) {
+ private SlidingProcessingTimeWindows(long size, long slide, long offset){
this.size = size;
this.slide = slide;
+ this.offset = offset;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
- long lastStart = timestamp - timestamp % slide;
+ long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
@@ -94,7 +97,32 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
* @return The time policy.
*/
public static SlidingProcessingTimeWindows of(Time size, Time slide) {
- return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
+ return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
+ }
+
+ /**
+ * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
+ * elements to time windows based on the element timestamp and offset.
+ *<p>
+ * For example, if you want window a stream by hour,but window begins at the 15th minutes
+ * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+ * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+ *</p>
+ *
+ * <p>
+ * Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+ * such as China which is using UTC+08:00,and you want a time window with size of one day,
+ * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+ * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
+ * </p>
+ * @param size The size of the generated windows.
+ * @param slide The slide interval of the generated windows.
+ * @param offset The offset which window start would be shifted by.
+ * @return The time policy.
+ */
+ public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
+ return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
+ offset.toMilliseconds() % slide.toMilliseconds());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 581bbe1..41a5d53 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -33,7 +33,7 @@ public class SlidingTimeWindows extends SlidingEventTimeWindows {
private static final long serialVersionUID = 1L;
private SlidingTimeWindows(long size, long slide) {
- super(size, slide);
+ super(size, slide, 0);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 44464f0..b7fa343 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -47,17 +47,19 @@ import java.util.Collections;
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
- private long size;
+ private final long size;
- protected TumblingEventTimeWindows(long size) {
+ private final long offset;
+
+ protected TumblingEventTimeWindows(long size, long offset){
this.size = size;
+ this.offset = offset;
}
-
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
- long start = timestamp - (timestamp % size);
+ long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
@@ -88,7 +90,30 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
* @return The time policy.
*/
public static TumblingEventTimeWindows of(Time size) {
- return new TumblingEventTimeWindows(size.toMilliseconds());
+ return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
+ }
+
+ /**
+ * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
+ * elements to time windows based on the element timestamp and offset.
+ *<p>
+ * For example, if you want window a stream by hour,but window begins at the 15th minutes
+ * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+ * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+ *</p>
+ *
+ * <p>
+ * Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+ * such as China which is using UTC+08:00,and you want a time window with size of one day,
+ * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+ * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
+ * </p>
+ * @param size The size of the generated windows.
+ * @param offset The offset which window start would be shifted by.
+ * @return The time policy.
+ */
+ public static TumblingEventTimeWindows of(Time size, Time offset) {
+ return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index ce36144..f1e9e11 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -44,16 +44,20 @@ import java.util.Collections;
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
- private long size;
+ private final long size;
- private TumblingProcessingTimeWindows(long size) {
+ private final long offset;
+
+
+ private TumblingProcessingTimeWindows(long size,long offset) {
this.size = size;
+ this.offset = offset;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
- long start = now - (now % size);
+ long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
@@ -79,9 +83,31 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
* @return The time policy.
*/
public static TumblingProcessingTimeWindows of(Time size) {
- return new TumblingProcessingTimeWindows(size.toMilliseconds());
+ return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
}
+ /**
+ * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
+ * elements to time windows based on the element timestamp and offset.
+ *<p>
+ * For example, if you want window a stream by hour,but window begins at the 15th minutes
+ * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+ * time windows start at 0:15:00,1:15:00,2:15:00,etc.
+ *</p>
+ *
+ * <p>
+ * Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+ * such as China which is using UTC+08:00,and you want a time window with size of one day,
+ * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+ * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
+ * </p>
+ * @param size The size of the generated windows.
+ * @param offset The offset which window start would be shifted by.
+ * @return The time policy.
+ */
+ public static TumblingProcessingTimeWindows of(Time size, Time offset) {
+ return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds());
+ }
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index 156b1e9..589bce3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -33,7 +33,7 @@ public class TumblingTimeWindows extends TumblingEventTimeWindows {
private static final long serialVersionUID = 1L;
private TumblingTimeWindows(long size) {
- super(size);
+ super(size, 0);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 5dfd60b..0d5d091 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -227,4 +227,16 @@ public class TimeWindow extends Window {
}
}
}
+
+ /**
+ * Method to get the window start for a timestamp.
+ *
+ * @param timestamp epoch millisecond to get the window start.
+ * @param offset The offset which window start would be shifted by.
+ * @param windowSize The size of the generated windows.
+ * @return window start
+ */
+ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
+ return timestamp - (timestamp - offset + windowSize) % windowSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
new file mode 100644
index 0000000..9633671
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class TimeWindowTest {
+ @Test
+ public void testGetWindowStartWithOffset() {
+ //[0,7),[7,14),[14,21)...
+ long offset = 0;
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),0);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6,offset,7),0);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),7);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8,offset,7),7);
+
+ //[-4,3),[3,10),[10,17)...
+ offset = 3;
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-4);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2,offset,7),-4);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),3);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9,offset,7),3);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10,offset,7),10);
+
+ //[-2,5),[5,12),[12,19)...
+ offset = -2;
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-2);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2,offset,7),-2);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),-2);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4,offset,7),-2);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),5);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12,offset,7),12);
+
+ // for GMT+8:00
+ offset = - TimeUnit.HOURS.toMillis(8);
+ long size = TimeUnit.DAYS.toMillis(1);
+ Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450l,offset,size),1470844800000l);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 62266c4..2d7b615 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -66,6 +66,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.WindowingTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
@@ -2550,4 +2551,178 @@ public class WindowOperatorTest {
return "EventTimeTrigger()";
}
}
+
+ @Test
+ public void testEventTimeTumblingWindowsWithOffset() throws Exception {
+ final int WINDOW_SIZE = 2000;
+ final int OFFSET = 100;
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(OFFSET));
+
+ WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>(
+ new ExecutionConfig(),
+ windowAssigner,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ inputType,
+ new TupleKeySelector(),
+ EventTimeTrigger.create(),
+ 0);
+
+ // normal element
+ testHarness.processElement(new Tuple2<>("key2", 1), 1000);
+ testHarness.processWatermark(1985);
+
+ testHarness.addExpectedWatermark(1985);
+
+ testHarness.processElement(new Tuple2<>("key2", 2), 1980);
+ testHarness.processElement(new Tuple2<>("key2", 3), 1998);
+ testHarness.processElement(new Tuple2<>("key2", 4), 2001);
+
+ // verify that this does not yet fire our windows, as it would without offsets
+ testHarness.processWatermark(2010);
+ testHarness.addExpectedWatermark(2010);
+
+ testHarness.processWatermark(2999);
+
+ testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999 + OFFSET);
+ testHarness.addExpectedElement(new Tuple2<>("key2", 2), 1999 + OFFSET);
+ testHarness.addExpectedElement(new Tuple2<>("key2", 3), 1999 + OFFSET);
+ testHarness.addExpectedElement(new Tuple2<>("key2", 4), 1999 + OFFSET);
+
+ testHarness.addExpectedWatermark(2999);
+
+ testHarness.processWatermark(3999);
+ testHarness.addExpectedWatermark(3999);
+
+ testHarness.compareActualToExpectedOutput("Output is not correct");
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testEventTimeSlidingWindowsWithOffset() throws Exception {
+ final int WINDOW_SIZE = 2000;
+ final int SLIDE = 500;
+ final int OFFSET = 10;
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ SlidingEventTimeWindows windowAssigner = SlidingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(SLIDE),Time.milliseconds(OFFSET));
+
+ WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>(
+ new ExecutionConfig(),
+ windowAssigner,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ inputType,
+ new TupleKeySelector(),
+ EventTimeTrigger.create(),
+ 0);
+
+ testHarness.processElement(new Tuple2<>("key2", 1), 333);
+ testHarness.processWatermark(6666);
+
+ testHarness.addExpectedElement(new Tuple2<>("key2",1),499 + OFFSET);
+ testHarness.addExpectedElement(new Tuple2<>("key2",1),999 + OFFSET);
+ testHarness.addExpectedElement(new Tuple2<>("key2",1),1499 + OFFSET);
+ testHarness.addExpectedElement(new Tuple2<>("key2",1),1999 + OFFSET);
+ testHarness.addExpectedWatermark(6666);
+ testHarness.compareActualToExpectedOutput("Output is not correct");
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testProcessingTimeTumblingWindowsWithOffset() throws Exception {
+ final int WINDOW_SIZE = 3000;
+ final int OFFSET = 1000;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ TumblingProcessingTimeWindows windowAssigner = TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE),
+ Time.milliseconds(OFFSET));
+
+ WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>(
+ new ExecutionConfig(),
+ windowAssigner,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ inputType,
+ new TupleKeySelector(),
+ ProcessingTimeTrigger.create(),
+ 0);
+
+ testHarness.setProcessingTime(3);
+
+ // timestamp is ignored in processing time
+ testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE);
+ testHarness.processElement(new Tuple2<>("key2", 1), 7000);
+ testHarness.processElement(new Tuple2<>("key2", 1), 7000);
+
+ testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+ testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+
+ testHarness.setProcessingTime(5000);
+
+ testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999);
+ testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999);
+ testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999);
+ testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999);
+ testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999);
+
+ testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+ testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+ testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+ testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+
+ testHarness.setProcessingTime(7000);
+
+ testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999);
+ testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999);
+ testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999);
+
+ testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testProcessingTimeSlidingWindowsWithOffset() throws Exception {
+ final int WINDOW_SIZE = 3000;
+ final int SLIDING = 1000;
+ final int OFFSET = 10;
+
+ TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ SlidingProcessingTimeWindows windowAssigner = SlidingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE),
+ Time.milliseconds(SLIDING),Time.milliseconds(OFFSET));
+
+ WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>(
+ new ExecutionConfig(),
+ windowAssigner,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ inputType,
+ new TupleKeySelector(),
+ ProcessingTimeTrigger.create(),
+ 0);
+
+ testHarness.setProcessingTime(3);
+
+ // timestamp is ignored in processing time
+ testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE);
+
+ testHarness.setProcessingTime(1111);
+
+ testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET - 1);
+ testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 999);
+
+ testHarness.processElement(new Tuple2<>("key2", 2),Long.MIN_VALUE);
+ testHarness.setProcessingTime(2222);
+
+ testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 1999);
+ testHarness.addExpectedElement(new Tuple2<>("key2", 2), OFFSET + 1999);
+
+ testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+ testHarness.close();
+ }
}