You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/19 16:24:04 UTC

[2/2] flink git commit: [FLINK-4282] Add Offset Parameter to WindowAssigners

[FLINK-4282] Add Offset Parameter to WindowAssigners


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09774626
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09774626
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09774626

Branch: refs/heads/master
Commit: 09774626086564b184abbe09776d3b7033badd20
Parents: 3be9a28
Author: renkai <ga...@gmail.com>
Authored: Thu Aug 11 18:48:50 2016 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Aug 19 18:23:38 2016 +0200

----------------------------------------------------------------------
 .../assigners/SlidingEventTimeWindows.java      |  34 +++-
 .../assigners/SlidingProcessingTimeWindows.java |  34 +++-
 .../windowing/assigners/SlidingTimeWindows.java |   2 +-
 .../assigners/TumblingEventTimeWindows.java     |  35 +++-
 .../TumblingProcessingTimeWindows.java          |  34 +++-
 .../assigners/TumblingTimeWindows.java          |   2 +-
 .../api/windowing/windows/TimeWindow.java       |  12 ++
 .../operators/windowing/TimeWindowTest.java     |  59 +++++++
 .../operators/windowing/WindowOperatorTest.java | 175 +++++++++++++++++++
 9 files changed, 370 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 8fd0d25..16171a0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -52,16 +52,19 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 
 	private final long slide;
 
-	protected SlidingEventTimeWindows(long size, long slide) {
+	private final long offset;
+
+	protected SlidingEventTimeWindows(long size, long slide, long offset) {
 		this.size = size;
 		this.slide = slide;
+		this.offset = offset;
 	}
 
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		if (timestamp > Long.MIN_VALUE) {
 			List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
-			long lastStart = timestamp - timestamp % slide;
+			long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
 			for (long start = lastStart;
 				start > timestamp - size;
 				start -= slide) {
@@ -102,7 +105,32 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	 * @return The time policy.
 	 */
 	public static SlidingEventTimeWindows of(Time size, Time slide) {
-		return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
+		return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
+	}
+
+	/**
+	 *  Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns
+	 *  elements to time windows based on the element timestamp and offset.
+	 *<p>
+	 *     For example, if you want window a stream by hour,but window begins at the 15th minutes
+	 *     of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+	 *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
+	 *</p>
+	 *
+	 * <p>
+	 *     Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+	 *     such as China which is using UTC+08:00,and you want a time window with size of one day,
+	 *     and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+	 *     The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
+	 * </p>
+	 * @param size The size of the generated windows.
+	 * @param slide  The slide interval of the generated windows.
+	 * @param offset The offset which window start would be shifted by.
+	 * @return The time policy.
+	 */
+	public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
+		return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
+			offset.toMilliseconds() % slide.toMilliseconds());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index 6a03640..e03467f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -47,18 +47,21 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
 
 	private final long size;
 
+	private final long offset;
+
 	private final long slide;
 
-	private SlidingProcessingTimeWindows(long size, long slide) {
+	private SlidingProcessingTimeWindows(long size, long slide, long offset){
 		this.size = size;
 		this.slide = slide;
+		this.offset = offset;
 	}
 
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		timestamp = context.getCurrentProcessingTime();
 		List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
-		long lastStart = timestamp - timestamp % slide;
+		long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
 		for (long start = lastStart;
 			start > timestamp - size;
 			start -= slide) {
@@ -94,7 +97,32 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
 	 * @return The time policy.
 	 */
 	public static SlidingProcessingTimeWindows of(Time size, Time slide) {
-		return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
+		return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
+	}
+
+	/**
+	 *  Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
+	 *  elements to time windows based on the element timestamp and offset.
+	 *<p>
+	 *     For example, if you want window a stream by hour,but window begins at the 15th minutes
+	 *     of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+	 *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
+	 *</p>
+	 *
+	 * <p>
+	 *     Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+	 *     such as China which is using UTC+08:00,and you want a time window with size of one day,
+	 *     and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+	 *     The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
+	 * </p>
+	 * @param size The size of the generated windows.
+	 * @param slide  The slide interval of the generated windows.
+	 * @param offset The offset which window start would be shifted by.
+	 * @return The time policy.
+	 */
+	public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
+		return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
+			offset.toMilliseconds() % slide.toMilliseconds());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 581bbe1..41a5d53 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -33,7 +33,7 @@ public class SlidingTimeWindows extends SlidingEventTimeWindows {
 	private static final long serialVersionUID = 1L;
 
 	private SlidingTimeWindows(long size, long slide) {
-		super(size, slide);
+		super(size, slide, 0);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 44464f0..b7fa343 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -47,17 +47,19 @@ import java.util.Collections;
 public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 
-	private long size;
+	private final long size;
 
-	protected TumblingEventTimeWindows(long size) {
+	private final long offset;
+
+	protected TumblingEventTimeWindows(long size, long offset){
 		this.size = size;
+		this.offset = offset;
 	}
-
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		if (timestamp > Long.MIN_VALUE) {
 			// Long.MIN_VALUE is currently assigned when no timestamp is present
-			long start = timestamp - (timestamp % size);
+			long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
 			return Collections.singletonList(new TimeWindow(start, start + size));
 		} else {
 			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
@@ -88,7 +90,30 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
 	 * @return The time policy.
 	 */
 	public static TumblingEventTimeWindows of(Time size) {
-		return new TumblingEventTimeWindows(size.toMilliseconds());
+		return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
+	}
+
+	/**
+	 *  Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
+	 *  elements to time windows based on the element timestamp and offset.
+	 *<p>
+	 *     For example, if you want window a stream by hour,but window begins at the 15th minutes
+	 *     of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+	 *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
+	 *</p>
+	 *
+	 * <p>
+	 *     Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+	 *     such as China which is using UTC+08:00,and you want a time window with size of one day,
+	 *     and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+	 *     The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
+	 * </p>
+	 * @param size The size of the generated windows.
+	 * @param offset The offset which window start would be shifted by.
+	 * @return The time policy.
+	 */
+	public static TumblingEventTimeWindows of(Time size, Time offset) {
+		return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index ce36144..f1e9e11 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -44,16 +44,20 @@ import java.util.Collections;
 public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 
-	private long size;
+	private final long size;
 
-	private TumblingProcessingTimeWindows(long size) {
+	private final long offset;
+
+
+	private TumblingProcessingTimeWindows(long size,long offset) {
 		this.size = size;
+		this.offset = offset;
 	}
 
 	@Override
 	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
 		final long now = context.getCurrentProcessingTime();
-		long start = now - (now % size);
+		long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
 		return Collections.singletonList(new TimeWindow(start, start + size));
 	}
 
@@ -79,9 +83,31 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
 	 * @return The time policy.
 	 */
 	public static TumblingProcessingTimeWindows of(Time size) {
-		return new TumblingProcessingTimeWindows(size.toMilliseconds());
+		return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
 	}
 
+	/**
+	 *  Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
+	 *  elements to time windows based on the element timestamp and offset.
+	 *<p>
+	 *     For example, if you want window a stream by hour,but window begins at the 15th minutes
+	 *     of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
+	 *     time windows start at 0:15:00,1:15:00,2:15:00,etc.
+	 *</p>
+	 *
+	 * <p>
+	 *     Rather than that,if you are living in somewhere which is not using UTC�00:00 time,
+	 *     such as China which is using UTC+08:00,and you want a time window with size of one day,
+	 *     and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
+	 *     The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
+	 * </p>
+	 * @param size The size of the generated windows.
+	 * @param offset The offset which window start would be shifted by.
+	 * @return The time policy.
+	 */
+	public static TumblingProcessingTimeWindows of(Time size, Time offset) {
+		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds());
+	}
 	@Override
 	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
 		return new TimeWindow.Serializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index 156b1e9..589bce3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -33,7 +33,7 @@ public class TumblingTimeWindows extends TumblingEventTimeWindows {
 	private static final long serialVersionUID = 1L;
 
 	private TumblingTimeWindows(long size) {
-		super(size);
+		super(size, 0);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 5dfd60b..0d5d091 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -227,4 +227,16 @@ public class TimeWindow extends Window {
 			}
 		}
 	}
+
+	/**
+	 * Method to get the window start for a timestamp.
+	 *
+	 * @param timestamp epoch millisecond to get the window start.
+	 * @param offset The offset which window start would be shifted by.
+	 * @param windowSize The size of the generated windows.
+	 * @return window start
+	 */
+	public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
+		return timestamp - (timestamp - offset + windowSize) % windowSize;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
new file mode 100644
index 0000000..9633671
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class TimeWindowTest {
+	@Test
+	public void testGetWindowStartWithOffset() {
+		//[0,7),[7,14),[14,21)...
+		long offset = 0;
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),0);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6,offset,7),0);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),7);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8,offset,7),7);
+
+		//[-4,3),[3,10),[10,17)...
+		offset = 3;
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-4);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2,offset,7),-4);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),3);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9,offset,7),3);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10,offset,7),10);
+
+		//[-2,5),[5,12),[12,19)...
+		offset = -2;
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-2);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2,offset,7),-2);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),-2);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4,offset,7),-2);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),5);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12,offset,7),12);
+
+		// for GMT+8:00
+		offset = - TimeUnit.HOURS.toMillis(8);
+		long size = TimeUnit.DAYS.toMillis(1);
+		Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450l,offset,size),1470844800000l);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09774626/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 62266c4..2d7b615 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -66,6 +66,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.WindowingTestHarness;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
@@ -2550,4 +2551,178 @@ public class WindowOperatorTest {
 			return "EventTimeTrigger()";
 		}
 	}
+
+	@Test
+	public void testEventTimeTumblingWindowsWithOffset() throws Exception {
+		final int WINDOW_SIZE = 2000;
+		final int OFFSET = 100;
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(OFFSET));
+
+		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>(
+			new ExecutionConfig(),
+			windowAssigner,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			inputType,
+			new TupleKeySelector(),
+			EventTimeTrigger.create(),
+			0);
+
+		// normal element
+		testHarness.processElement(new Tuple2<>("key2", 1), 1000);
+		testHarness.processWatermark(1985);
+
+		testHarness.addExpectedWatermark(1985);
+
+		testHarness.processElement(new Tuple2<>("key2", 2), 1980);
+		testHarness.processElement(new Tuple2<>("key2", 3), 1998);
+		testHarness.processElement(new Tuple2<>("key2", 4), 2001);
+
+		// verify that this does not yet fire our windows, as it would without offsets
+		testHarness.processWatermark(2010);
+		testHarness.addExpectedWatermark(2010);
+
+		testHarness.processWatermark(2999);
+
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999 + OFFSET);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 2), 1999 + OFFSET);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 3), 1999 + OFFSET);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 4), 1999 + OFFSET);
+
+		testHarness.addExpectedWatermark(2999);
+
+		testHarness.processWatermark(3999);
+		testHarness.addExpectedWatermark(3999);
+
+		testHarness.compareActualToExpectedOutput("Output is not correct");
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testEventTimeSlidingWindowsWithOffset() throws Exception {
+		final int WINDOW_SIZE = 2000;
+		final int SLIDE = 500;
+		final int OFFSET = 10;
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		SlidingEventTimeWindows windowAssigner = SlidingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(SLIDE),Time.milliseconds(OFFSET));
+
+		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>(
+			new ExecutionConfig(),
+			windowAssigner,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			inputType,
+			new TupleKeySelector(),
+			EventTimeTrigger.create(),
+			0);
+
+		testHarness.processElement(new Tuple2<>("key2", 1), 333);
+		testHarness.processWatermark(6666);
+
+		testHarness.addExpectedElement(new Tuple2<>("key2",1),499 + OFFSET);
+		testHarness.addExpectedElement(new Tuple2<>("key2",1),999 + OFFSET);
+		testHarness.addExpectedElement(new Tuple2<>("key2",1),1499 + OFFSET);
+		testHarness.addExpectedElement(new Tuple2<>("key2",1),1999 + OFFSET);
+		testHarness.addExpectedWatermark(6666);
+		testHarness.compareActualToExpectedOutput("Output is not correct");
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeTumblingWindowsWithOffset() throws Exception {
+		final int WINDOW_SIZE = 3000;
+		final int OFFSET = 1000;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		TumblingProcessingTimeWindows windowAssigner = TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE),
+			Time.milliseconds(OFFSET));
+
+		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>(
+			new ExecutionConfig(),
+			windowAssigner,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			inputType,
+			new TupleKeySelector(),
+			ProcessingTimeTrigger.create(),
+			0);
+
+		testHarness.setProcessingTime(3);
+
+		// timestamp is ignored in processing time
+		testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE);
+		testHarness.processElement(new Tuple2<>("key2", 1), 7000);
+		testHarness.processElement(new Tuple2<>("key2", 1), 7000);
+
+		testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+		testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+
+		testHarness.setProcessingTime(5000);
+
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999);
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999);
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999);
+
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+		testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+		testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+		testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+
+		testHarness.setProcessingTime(7000);
+
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999);
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999);
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999);
+
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeSlidingWindowsWithOffset() throws Exception {
+		final int WINDOW_SIZE = 3000;
+		final int SLIDING = 1000;
+		final int OFFSET = 10;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		SlidingProcessingTimeWindows windowAssigner = SlidingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE),
+			Time.milliseconds(SLIDING),Time.milliseconds(OFFSET));
+
+		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness = new WindowingTestHarness<>(
+			new ExecutionConfig(),
+			windowAssigner,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			inputType,
+			new TupleKeySelector(),
+			ProcessingTimeTrigger.create(),
+			0);
+
+		testHarness.setProcessingTime(3);
+
+		// timestamp is ignored in processing time
+		testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE);
+
+		testHarness.setProcessingTime(1111);
+
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET - 1);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 999);
+
+		testHarness.processElement(new Tuple2<>("key2", 2),Long.MIN_VALUE);
+		testHarness.setProcessingTime(2222);
+
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET + 1999);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 2), OFFSET + 1999);
+
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+		testHarness.close();
+	}
 }