You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/11 11:15:49 UTC

[4/6] flink git commit: [FLINK-5237] Consolidate and harmonize Window Translation Tests

[FLINK-5237] Consolidate and harmonize Window Translation Tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa220e48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa220e48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa220e48

Branch: refs/heads/master
Commit: aa220e487db88079c224e50190055cef41df3f9a
Parents: 8c8c028
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 24 08:14:48 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jan 11 10:35:47 2017 +0100

----------------------------------------------------------------------
 .../windowing/AllWindowTranslationTest.java     |  841 ++++++++++---
 .../windowing/TimeWindowTranslationTest.java    |  112 +-
 .../windowing/WindowTranslationTest.java        |  844 ++++++++++---
 flink-streaming-scala/pom.xml                   |    8 +
 .../api/scala/AllWindowTranslationTest.scala    | 1086 ++++++++++++++---
 .../api/scala/TimeWindowTranslationTest.scala   |  182 +++
 .../api/scala/WindowTranslationTest.scala       | 1131 +++++++++++++++---
 7 files changed, 3561 insertions(+), 643 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa220e48/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 72b0850..3d4de5d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,121 +17,548 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFoldFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.AllWindowedStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * These tests verify that the api calls on
- * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate
- * the correct window operator.
+ * These tests verify that the api calls on {@link AllWindowedStream} instantiate the correct
+ * window operator.
+ *
+ * <p>We also create a test harness and push one element into the operator to verify
+ * that we get some output.
  */
+@SuppressWarnings("serial")
 public class AllWindowTranslationTest {
 
 	/**
-	 * These tests ensure that the correct trigger is set when using event-time windows.
+	 * .reduce() does not support RichReduceFunction, since the reduce function is used internally
+	 * in a {@code ReducingState}.
 	 */
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testEventTime() throws Exception {
+	@Test(expected = UnsupportedOperationException.class)
+	public void testReduceWithRichReducerFails() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		source
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = -6448847205314995812L;
+
+					@Override
+					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+							Tuple2<String, Integer> value2) throws Exception {
+						return null;
+					}
+				});
+
+		fail("exception was not thrown");
+	}
+
+	/**
+	 * .fold() does not support RichFoldFunction, since the fold function is used internally
+	 * in a {@code FoldingState}.
+	 */
+	@Test(expected = UnsupportedOperationException.class)
+	public void testFoldWithRichFolderFails() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		source
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.fold(new Tuple2<>("", 0), new RichFoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = -6448847205314995812L;
+
+					@Override
+					public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1,
+							Tuple2<String, Integer> value2) throws Exception {
+						return null;
+					}
+				});
+
+		fail("exception was not thrown");
+	}
+
+
+	@Test
+	public void testSessionWithFoldFails() throws Exception {
+		// verify that fold does not work with merging windows
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
+				.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+
+		try {
+			windowedStream.fold("", new FoldFunction<String, String>() {
+				private static final long serialVersionUID = -4567902917104921706L;
+
+				@Override
+				public String fold(String accumulator, String value) throws Exception {
+					return accumulator;
+				}
+			});
+		} catch (UnsupportedOperationException e) {
+			// expected
+			// use a catch to ensure that the exception is thrown by the fold
+			return;
+		}
+
+		fail("The fold call should fail.");
+	}
+
+	@Test
+	public void testMergingAssignerWithNonMergingTriggerFails() throws Exception {
+		// verify that we check for trigger compatibility
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
+				.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+
+		try {
+			windowedStream.trigger(new Trigger<String, TimeWindow>() {
+				private static final long serialVersionUID = 6558046711583024443L;
+
+				@Override
+				public TriggerResult onElement(String element,
+						long timestamp,
+						TimeWindow window,
+						TriggerContext ctx) throws Exception {
+					return null;
+				}
+
+				@Override
+				public TriggerResult onProcessingTime(long time,
+						TimeWindow window,
+						TriggerContext ctx) throws Exception {
+					return null;
+				}
+
+				@Override
+				public TriggerResult onEventTime(long time,
+						TimeWindow window,
+						TriggerContext ctx) throws Exception {
+					return null;
+				}
+
+				@Override
+				public boolean canMerge() {
+					return false;
+				}
+
+				@Override
+				public void clear(TimeWindow window, TriggerContext ctx) throws Exception {}
+			});
+		} catch (UnsupportedOperationException e) {
+			// expected
+			// use a catch to ensure that the exception is thrown by the fold
+			return;
+		}
+
+		fail("The trigger call should fail.");
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-		DummyReducer reducer = new DummyReducer();
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.reduce(reducer);
+				.reduce(new DummyReducer());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.reduce(new DummyReducer());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+
+	/**
+	 * Ignored because we currently don't have the fast processing-time window operator.
+	 */
+	@Test
+	@SuppressWarnings("rawtypes")
+	@Ignore
+	public void testReduceFastProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.reduce(new DummyReducer());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof AggregatingProcessingTimeWindowOperator);
+
+		processElementAndEnsureOutput(operator, null, BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof WindowOperator);
-		WindowOperator winOperator1 = (WindowOperator) operator1;
-		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
-		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
 
-		DataStream<Tuple2<String, Integer>> window2 = source
+		DataStream<Tuple3<String, String, Integer>> window = source
 				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+				.reduce(reducer, new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
 					public void apply(
 							TimeWindow window,
 							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
 					}
 				});
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof WindowOperator);
-		WindowOperator winOperator2 = (WindowOperator) operator2;
-		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
-		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
 	@Test
 	@SuppressWarnings("rawtypes")
-	public void testNonEvicting() throws Exception {
+	public void testReduceWithWindowFunctionProcessingTime() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.reduce(new DummyReducer(), new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	/**
+	 * Test for the deprecated .apply(Reducer, WindowFunction).
+	 */
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithPreReducerEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
 		DummyReducer reducer = new DummyReducer();
 
-		DataStream<Tuple2<String, Integer>> window1 = source
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(reducer, new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window1 = source
 				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.trigger(CountTrigger.of(100))
-				.reduce(reducer);
+				.fold(new Tuple3<>("", "", 1), new DummyFolder());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.fold(new Tuple3<>("", "", 0), new DummyFolder());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof WindowOperator);
-		WindowOperator winOperator1 = (WindowOperator) operator1;
-		Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
-		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String,  Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
 
-		DataStream<Tuple2<String, Integer>> window2 = source
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldWithWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldWithWindowFunctionProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithPreFolderEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new Tuple3<>("", "", 0), new DummyFolder(), new AllWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f1, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
 				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.trigger(CountTrigger.of(100))
 				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
@@ -140,24 +567,65 @@ public class AllWindowTranslationTest {
 							TimeWindow window,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyProcessingTimeTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
 
+					@Override
+					public void apply(
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
 					}
 				});
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof WindowOperator);
-		WindowOperator winOperator2 = (WindowOperator) operator2;
-		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
-		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+
 	}
 
+
 	@Test
 	@SuppressWarnings("rawtypes")
-	public void testEvicting() throws Exception {
+	public void testReduceWithCustomTrigger() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
@@ -165,22 +633,56 @@ public class AllWindowTranslationTest {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.evictor(CountEvictor.of(100))
+				.trigger(CountTrigger.of(1))
 				.reduce(reducer);
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
-		EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
-		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
-		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
-		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor);
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
 
-		DataStream<Tuple2<String, Integer>> window2 = source
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldWithCustomTrigger() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window1 = source
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.trigger(CountTrigger.of(1))
+				.fold(new Tuple3<>("", "", 1), new DummyFolder());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithCustomTrigger() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
 				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.trigger(CountTrigger.of(100))
-				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
+				.trigger(CountTrigger.of(1))
 				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
@@ -189,136 +691,146 @@ public class AllWindowTranslationTest {
 							TimeWindow window,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
-
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
 					}
 				});
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
-		EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
-		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
-		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
-		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
-	/**
-	 * These tests ensure that a Fold buffer is used if possible
-	 */
 	@Test
 	@SuppressWarnings("rawtypes")
-	public void testFoldBuffer() throws Exception {
+	public void testReduceWithEvictor() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-		DummyFolder folder = new DummyFolder();
+		DummyReducer reducer = new DummyReducer();
 
-		DataStream<Integer> window1 = source
+		DataStream<Tuple2<String, Integer>> window1 = source
 				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.fold(0, folder);
+				.evictor(CountEvictor.of(100))
+				.reduce(reducer);
 
-		OneInputTransformation<Tuple2<String, Integer>, Integer> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Integer> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof WindowOperator);
-		WindowOperator winOperator1 = (WindowOperator) operator1;
-		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
-		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof FoldingStateDescriptor);
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
 
-		DataStream<Integer> window2 = source
-				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.evictor(CountEvictor.of(13))
-				.fold(0, folder);
-
-		OneInputTransformation<Tuple2<String, Integer>, Integer> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Integer>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Integer> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof WindowOperator);
-		WindowOperator winOperator2 = (WindowOperator) operator2;
-		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
-		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
 	@Test
-	public void testSessionWithFold() throws Exception {
-		// verify that fold does not work with merging windows
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public void testFoldWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-		StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
-		AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
-				.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+		DataStream<Tuple3<String, String, Integer>> window1 = source
+				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.fold(new Tuple3<>("", "", 1), new DummyFolder());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
 
-		try {
-			windowedStream.fold("", new FoldFunction<String, String>() {
-				private static final long serialVersionUID = -8722899157560218917L;
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-				@Override
-				public String fold(String accumulator, String value) throws Exception {
-					return accumulator;
-				}
-			});
-		} catch (UnsupportedOperationException e) {
-			// expected
-			// use a catch to ensure that the exception is thrown by the fold
-			return;
-		}
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
-		fail("The fold call should fail.");
-	}
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.trigger(CountTrigger.of(1))
+				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
+				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
 
-	@Test
-	public void testMergingAssignerWithNonMergingTrigger() throws Exception {
-		// verify that we check for trigger compatibility
+					@Override
+					public void apply(
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
 
-		StreamExecutionEnvironment env = LocalStreamEnvironment.createLocalEnvironment();
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
 
-		AllWindowedStream<String, TimeWindow> windowedStream = env.fromElements("Hello", "Ciao")
-				.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5)));
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
 
-		try {
-			windowedStream.trigger(new Trigger<String, TimeWindow>() {
-				private static final long serialVersionUID = 8360971631424870421L;
+	/**
+	 * Ensure that we get some output from the given operator when pushing in an element and
+	 * setting watermark and processing time to {@code Long.MAX_VALUE}.
+	 */
+	private static <K, IN, OUT> void processElementAndEnsureOutput(
+			OneInputStreamOperator<IN, OUT> operator,
+			KeySelector<IN, K> keySelector,
+			TypeInformation<K> keyType,
+			IN element) throws Exception {
 
-				@Override
-				public TriggerResult onElement(String element,
-						long timestamp,
-						TimeWindow window,
-						TriggerContext ctx) throws Exception {
-					return null;
-				}
+		KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						operator,
+						keySelector,
+						keyType);
 
-				@Override
-				public TriggerResult onProcessingTime(long time,
-						TimeWindow window,
-						TriggerContext ctx) throws Exception {
-					return null;
-				}
+		testHarness.open();
 
-				@Override
-				public TriggerResult onEventTime(long time,
-						TimeWindow window,
-						TriggerContext ctx) throws Exception {
-					return null;
-				}
+		testHarness.setProcessingTime(0);
+		testHarness.processWatermark(Long.MIN_VALUE);
 
-				@Override
-				public boolean canMerge() {
-					return false;
-				}
+		testHarness.processElement(new StreamRecord<>(element, 0));
 
-				@Override
-				public void clear(TimeWindow window, TriggerContext ctx) throws Exception {}
-			});
-		} catch (UnsupportedOperationException e) {
-			// expected
-			// use a catch to ensure that the exception is thrown by the fold
-			return;
-		}
+		// provoke any processing-time/event-time triggers
+		testHarness.setProcessingTime(Long.MAX_VALUE);
+		testHarness.processWatermark(Long.MAX_VALUE);
 
-		fail("The trigger call should fail.");
+		// we at least get the two watermarks and should also see an output element
+		assertTrue(testHarness.getOutput().size() >= 3);
+
+		testHarness.close();
 	}
 
 	// ------------------------------------------------------------------------
@@ -334,13 +846,12 @@ public class AllWindowTranslationTest {
 		}
 	}
 
-	public static class DummyFolder implements FoldFunction<Tuple2<String, Integer>, Integer> {
-		private static final long serialVersionUID = 1L;
-
+	private static class DummyFolder implements FoldFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {
 		@Override
-		public Integer fold(Integer accumulator, Tuple2<String, Integer> value) throws Exception {
+		public Tuple3<String, String, Integer> fold(
+				Tuple3<String, String, Integer> accumulator,
+				Tuple2<String, Integer> value) throws Exception {
 			return accumulator;
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa220e48/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index 5aa8151..8e37021 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,9 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -44,9 +46,8 @@ import org.junit.Test;
 import java.util.concurrent.TimeUnit;
 
 /**
- * These tests verify that the api calls on
- * {@link WindowedStream} instantiate
- * the correct window operator.
+ * These tests verify that the api calls on {@link WindowedStream} that use the "time" shortcut
+ * instantiate the correct window operator.
  */
 public class TimeWindowTranslationTest {
 
@@ -56,8 +57,9 @@ public class TimeWindowTranslationTest {
 	 */
 	@Test
 	@Ignore
-	public void testFastTimeWindows() throws Exception {
+	public void testReduceFastTimeWindows() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
@@ -71,8 +73,21 @@ public class TimeWindowTranslationTest {
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
 		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+	}
 
-		DataStream<Tuple2<String, Integer>> window2 = source
+	/**
+	 * These tests ensure that the fast aligned time windows operator is used if the
+	 * conditions are right.
+	 */
+	@Test
+	@Ignore
+	public void testApplyFastTimeWindows() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
 				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
 				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -87,25 +102,25 @@ public class TimeWindowTranslationTest {
 					}
 				});
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof AccumulatingProcessingTimeWindowOperator);
 	}
 
 	@Test
 	@SuppressWarnings("rawtypes")
-	public void testEventTimeWindows() throws Exception {
+	public void testReduceEventTimeWindows() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(
+				Tuple2.of("hello", 1),
+				Tuple2.of("hello", 2));
 
 		DataStream<Tuple2<String, Integer>> window1 = source
-			.keyBy(0)
-			.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
-			.reduce(reducer);
+				.keyBy(0)
+				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+				.reduce(new DummyReducer());
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
@@ -114,8 +129,43 @@ public class TimeWindowTranslationTest {
 		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor);
+	}
 
-		DataStream<Tuple2<String, Integer>> window2 = source
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldEventTimeWindows() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(
+				Tuple2.of("hello", 1),
+				Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS),Time.of(100, TimeUnit.MILLISECONDS))
+				.fold(new Tuple2<>("", 1), new DummyFolder());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof WindowOperator);
+		WindowOperator winOperator1 = (WindowOperator) operator1;
+		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof FoldingStateDescriptor);
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyEventTimeWindows() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(
+				Tuple2.of("hello", 1),
+				Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
 			.keyBy(0)
 			.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
 			.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -130,14 +180,13 @@ public class TimeWindowTranslationTest {
 				}
 			});
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof WindowOperator);
-		WindowOperator winOperator2 = (WindowOperator) operator2;
-		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingEventTimeWindows);
-		Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor);
-
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof WindowOperator);
+		WindowOperator winOperator1 = (WindowOperator) operator1;
+		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor);
 	}
 
 	/**
@@ -187,7 +236,7 @@ public class TimeWindowTranslationTest {
 	//  UDFs
 	// ------------------------------------------------------------------------
 
-	public static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> {
+	private static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -195,4 +244,15 @@ public class TimeWindowTranslationTest {
 			return value1;
 		}
 	}
+
+	private static class DummyFolder
+			implements FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, Integer> fold(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
+			return value1;
+		}
+	}
+
 }