You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/17 16:24:12 UTC

[2/8] flink git commit: [FLINK-5237] Consolidate and harmonize Window Translation Tests

[FLINK-5237] Consolidate and harmonize Window Translation Tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5368a7d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5368a7d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5368a7d3

Branch: refs/heads/master
Commit: 5368a7d32d96beb1b8298b87d9ea6d42ea306947
Parents: fe2a301
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 24 08:14:48 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 17 17:15:51 2017 +0100

----------------------------------------------------------------------
 .../operators/StateDescriptorPassingTest.java   |  26 +
 .../windowing/WindowTranslationTest.java        | 718 +++++++++++++++--
 .../ScalaProcessWindowFunctionWrapper.scala     |  16 +-
 .../api/scala/WindowTranslationTest.scala       | 766 +++++++++++++++++--
 4 files changed, 1420 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index 26cb7ac..813ca96 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -136,6 +137,31 @@ public class StateDescriptorPassingTest {
 	}
 
 	@Test
+	public void testProcessWindowState() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+		DataStream<File> src = env.fromElements(new File("/"));
+
+		SingleOutputStreamOperator<?> result = src
+				.keyBy(new KeySelector<File, String>() {
+					@Override
+					public String getKey(File value) {
+						return null;
+					}
+				})
+				.timeWindow(Time.milliseconds(1000))
+				.process(new ProcessWindowFunction<File, String, String, TimeWindow>() {
+					@Override
+					public void process(String s, Context ctx,
+							Iterable<File> input, Collector<String> out) {}
+				});
+
+		validateListStateDescriptorConfigured(result);
+	}
+
+	@Test
 	public void testFoldWindowAllState() throws Exception {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index f72a2f1..b899948 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -31,14 +31,17 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
@@ -112,7 +115,7 @@ public class WindowTranslationTest {
 	 * in a {@code AggregatingState}.
 	 */
 	@Test(expected = UnsupportedOperationException.class)
-	public void testAgrgegateWithRichFunctionFails() throws Exception {
+	public void testAggregateWithRichFunctionFails() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -405,6 +408,82 @@ public class WindowTranslationTest {
 		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithProcesWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.reduce(reducer, new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String key,
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.reduce(new DummyReducer(), new ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String tuple,
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
 	/**
 	 * Test for the deprecated .apply(Reducer, WindowFunction).
 	 */
@@ -447,6 +526,50 @@ public class WindowTranslationTest {
 		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
+	/**
+	 * Test for the deprecated .apply(Reducer, WindowFunction).
+	 */
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithPreReducerAndEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.evictor(CountEvictor.of(100))
+				.apply(reducer, new WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+
 	// ------------------------------------------------------------------------
 	//  Aggregate Translation Tests
 	// ------------------------------------------------------------------------
@@ -463,13 +586,13 @@ public class WindowTranslationTest {
 				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.aggregate(new DummyAggregationFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = 
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
 				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = 
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
 				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
@@ -492,13 +615,13 @@ public class WindowTranslationTest {
 				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.aggregate(new DummyAggregationFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = 
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
 				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = 
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
 				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
@@ -529,7 +652,7 @@ public class WindowTranslationTest {
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = 
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
 				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
@@ -569,6 +692,66 @@ public class WindowTranslationTest {
 				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
+	@Test
+	public void testAggregateWithProcessWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
+
+		processElementAndEnsureOutput(
+				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	public void testAggregateWithProcessWindowFunctionProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
+
+		processElementAndEnsureOutput(
+				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
 	// ------------------------------------------------------------------------
 	//  Fold Translation Tests
 	// ------------------------------------------------------------------------
@@ -664,83 +847,269 @@ public class WindowTranslationTest {
 	@SuppressWarnings("rawtypes")
 	public void testFoldWithWindowFunctionProcessingTime() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldWithProcessWindowFunctionEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String key,
+							Context ctx,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testFoldWithProcessWindowFunctionProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String key,
+							Context ctx,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple2<>(in.f0, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithPreFolderEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyWithPreFolderAndEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.evictor(CountEvictor.of(100))
+				.apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(String key,
+							TimeWindow window,
+							Iterable<Tuple3<String, String, Integer>> values,
+							Collector<Tuple3<String, String, Integer>> out) throws Exception {
+						for (Tuple3<String, String, Integer> in : values) {
+							out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Apply Translation Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testApplyEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
-		DataStream<Tuple2<String, Integer>> window = source
+		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(new TupleKeySelector())
-				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
 					public void apply(String key,
 							TimeWindow window,
-							Iterable<Tuple3<String, String, Integer>> values,
+							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
-						for (Tuple3<String, String, Integer> in : values) {
-							out.collect(new Tuple2<>(in.f0, in.f2));
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
 						}
 					}
 				});
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
 		Assert.assertTrue(operator instanceof WindowOperator);
 		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
-		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
-		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
-		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
 
 		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
 	@Test
 	@SuppressWarnings("rawtypes")
-	public void testApplyWithPreFolderEventTime() throws Exception {
+	public void testApplyProcessingTime() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
-		DataStream<Tuple3<String, String, Integer>> window = source
+		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(new TupleKeySelector())
-				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new Tuple3<>("", "", 0), new DummyFolder(), new WindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>() {
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
 					public void apply(String key,
 							TimeWindow window,
-							Iterable<Tuple3<String, String, Integer>> values,
-							Collector<Tuple3<String, String, Integer>> out) throws Exception {
-						for (Tuple3<String, String, Integer> in : values) {
-							out.collect(new Tuple3<>(in.f0, in. f1, in.f2));
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
 						}
 					}
 				});
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String,String, Integer>> operator = transform.getOperator();
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
 		Assert.assertTrue(operator instanceof WindowOperator);
 		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
-		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
-		Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
 
 		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
-	}
 
-	// ------------------------------------------------------------------------
-	//  Apply Translation Tests
-	// ------------------------------------------------------------------------
+	}
 
 	@Test
 	@SuppressWarnings("rawtypes")
-	public void testApplyEventTime() throws Exception {
+	public void testProcessEventTime() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
@@ -749,12 +1118,12 @@ public class WindowTranslationTest {
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(new TupleKeySelector())
 				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+				.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void apply(String key,
-							TimeWindow window,
+					public void process(String key,
+							Context ctx,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
 						for (Tuple2<String, Integer> in : values) {
@@ -776,7 +1145,7 @@ public class WindowTranslationTest {
 
 	@Test
 	@SuppressWarnings("rawtypes")
-	public void testApplyProcessingTimeTime() throws Exception {
+	public void testProcessProcessingTime() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
@@ -785,12 +1154,12 @@ public class WindowTranslationTest {
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(new TupleKeySelector())
 				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+				.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
 					@Override
-					public void apply(String key,
-							TimeWindow window,
+					public void process(String key,
+							Context ctx,
 							Iterable<Tuple2<String, Integer>> values,
 							Collector<Tuple2<String, Integer>> out) throws Exception {
 						for (Tuple2<String, Integer> in : values) {
@@ -903,6 +1272,43 @@ public class WindowTranslationTest {
 
 	@Test
 	@SuppressWarnings("rawtypes")
+	public void testProcessWithCustomTrigger() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.trigger(CountTrigger.of(1))
+				.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String key,
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	@SuppressWarnings("rawtypes")
 	public void testReduceWithEvictor() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
@@ -930,6 +1336,121 @@ public class WindowTranslationTest {
 	}
 
 	@Test
+	@SuppressWarnings("rawtypes")
+	public void testReduceWithEvictorAndProcessFunction() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.reduce(
+						reducer,
+						new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+							@Override
+							public void process(
+									Tuple tuple,
+									Context context,
+									Iterable<Tuple2<String, Integer>> elements,
+									Collector<Tuple2<String, Integer>> out) throws Exception {
+								for (Tuple2<String, Integer> in : elements) {
+									out.collect(in);
+								}
+							}
+						});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	public void testAggregateWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.aggregate(new DummyAggregationFunction());
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(
+				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
+	public void testAggregateWithEvictorAndProcessFunction() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.aggregate(
+						new DummyAggregationFunction(),
+						new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+							@Override
+							public void process(
+									String s,
+									Context context,
+									Iterable<Tuple2<String, Integer>> elements,
+									Collector<Tuple2<String, Integer>> out) throws Exception {
+								for (Tuple2<String, Integer> in : elements) {
+									out.collect(in);
+								}
+							}
+						});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+		Assert.assertTrue(operator instanceof WindowOperator);
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(
+				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+
+	@Test
 	@SuppressWarnings({"rawtypes", "unchecked"})
 	public void testFoldWithEvictor() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -958,6 +1479,48 @@ public class WindowTranslationTest {
 	}
 
 	@Test
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public void testFoldWithEvictorAndProcessFunction() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple3<String, String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.evictor(CountEvictor.of(100))
+				.fold(
+						new Tuple3<>("", "", 1),
+						new DummyFolder(),
+						new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, Tuple, TimeWindow>() {
+							@Override
+							public void process(
+									Tuple tuple,
+									Context context,
+									Iterable<Tuple3<String, String, Integer>> elements,
+									Collector<Tuple3<String, String, Integer>> out) throws Exception {
+								for (Tuple3<String, String, Integer> in : elements) {
+									out.collect(in);
+								}
+							}
+						});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
+	@Test
 	@SuppressWarnings("rawtypes")
 	public void testApplyWithEvictor() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -996,6 +1559,45 @@ public class WindowTranslationTest {
 		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
 	}
 
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testProcessWithEvictor() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(new TupleKeySelector())
+				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.trigger(CountTrigger.of(1))
+				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
+				.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void process(String key,
+							Context ctx,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (Tuple2<String, Integer> in : values) {
+							out.collect(in);
+						}
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+		Assert.assertTrue(operator instanceof EvictingWindowOperator);
+		EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+		Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+		Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
+		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+	}
+
 	/**
 	 * Ensure that we get some output from the given operator when pushing in an element and
 	 * setting watermark and processing time to {@code Long.MAX_VALUE}.
@@ -1012,6 +1614,12 @@ public class WindowTranslationTest {
 						keySelector,
 						keyType);
 
+		if (operator instanceof OutputTypeConfigurable) {
+			// use a dummy type since window functions just need the ExecutionConfig
+			// this is also only needed for Fold, which we're getting rid off soon.
+			((OutputTypeConfigurable) operator).setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
+		}
+
 		testHarness.open();
 
 		testHarness.setProcessingTime(0);
@@ -1050,7 +1658,7 @@ public class WindowTranslationTest {
 		}
 	}
 
-	private static class DummyAggregationFunction 
+	private static class DummyAggregationFunction
 			implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
 
 		@Override
@@ -1096,7 +1704,7 @@ public class WindowTranslationTest {
 		}
 	}
 
-	private static class TestWindowFunction 
+	private static class TestWindowFunction
 			implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> {
 
 		@Override
@@ -1111,6 +1719,22 @@ public class WindowTranslationTest {
 		}
 	}
 
+	private static class TestProcessWindowFunction
+			extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow> {
+
+		@Override
+		public void process(String key,
+				Context ctx,
+				Iterable<Tuple2<String, Integer>> values,
+				Collector<Tuple3<String, String, Integer>> out) throws Exception {
+
+			for (Tuple2<String, Integer> in : values) {
+				out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+			}
+		}
+	}
+
+
 	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5368a7d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 4a20371..23293a6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.api.scala.function.util
 
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext}
-import org.apache.flink.api.java.operators.translation.WrappingFunction
 import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
 import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -37,8 +35,7 @@ import scala.collection.JavaConverters._
   */
 final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
     private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
-    extends WrappingFunction[ProcessWindowFunction[IN, OUT, KEY, W]](func)
-    with JProcessWindowFunctionTrait[IN, OUT, KEY, W] {
+    extends JProcessWindowFunction[IN, OUT, KEY, W] {
 
   override def process(
       key: KEY,
@@ -50,15 +47,4 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
     }
     func.process(key, ctx, elements.asScala, out)
   }
-
-  override def getRuntimeContext: RuntimeContext = {
-    throw new RuntimeException("This should never be called")
-  }
-
-  override def getIterationRuntimeContext: IterationRuntimeContext = {
-    throw new RuntimeException("This should never be called")
-  }
 }
-
-private trait JProcessWindowFunctionTrait[IN, OUT, KEY, W]
-  extends JProcessWindowFunction[IN, OUT, KEY, W]