You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/25 16:01:36 UTC

[1/3] flink git commit: [FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction

Repository: flink
Updated Branches:
  refs/heads/master 5c37e55c8 -> 662ed33d8


http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index d4fefa2..6f34607 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -21,9 +21,11 @@ package org.apache.flink.streaming.api.operators.windowing.functions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
@@ -31,6 +33,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunct
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
@@ -41,6 +44,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.util.Collector;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Test;
@@ -56,7 +60,11 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.collection.IsMapContaining.hasEntry;
 import static org.hamcrest.core.AllOf.allOf;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyObject;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 public class InternalWindowFunctionTest {
 
@@ -93,7 +101,9 @@ public class InternalWindowFunctionTest {
 		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 
-		windowFunction.apply(((byte)0), w, i, c);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+
+		windowFunction.process(((byte)0), w, ctx, i, c);
 		verify(mock).apply(w, i, c);
 
 		// check close
@@ -134,7 +144,8 @@ public class InternalWindowFunctionTest {
 		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 
-		windowFunction.apply(((byte)0), w, i, c);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+		windowFunction.process(((byte)0), w, ctx, i, c);
 		verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
 
 		// check close
@@ -175,7 +186,8 @@ public class InternalWindowFunctionTest {
 		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 
-		windowFunction.apply(42L, w, i, c);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+		windowFunction.process(42L, w, ctx, i, c);
 		verify(mock).apply(eq(42L), eq(w), eq(i), eq(c));
 
 		// check close
@@ -215,8 +227,9 @@ public class InternalWindowFunctionTest {
 		TimeWindow w = mock(TimeWindow.class);
 		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(42L, w, i, c);
+		windowFunction.process(42L, w, ctx, i, c);
 		verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
 
 		// check close
@@ -256,8 +269,9 @@ public class InternalWindowFunctionTest {
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(42L, w, 23L, c);
+		windowFunction.process(42L, w, ctx, 23L, c);
 		verify(mock).apply(eq(42L), eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
 		// check close
@@ -297,8 +311,9 @@ public class InternalWindowFunctionTest {
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(((byte)0), w, 23L, c);
+		windowFunction.process(((byte)0), w, ctx, 23L, c);
 		verify(mock).apply(eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
 		// check close
@@ -338,8 +353,9 @@ public class InternalWindowFunctionTest {
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(((byte)0), w, 23L, c);
+		windowFunction.process(((byte)0), w, ctx, 23L, c);
 		verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
 		// check close
@@ -378,8 +394,9 @@ public class InternalWindowFunctionTest {
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(42L, w, 23L, c);
+		windowFunction.process(42L, w, ctx,23L, c);
 		verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
 		// check close
@@ -450,8 +467,9 @@ public class InternalWindowFunctionTest {
 		List<Long> args = new LinkedList<>();
 		args.add(23L);
 		args.add(24L);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(42L, w, args, c);
+		windowFunction.process(42L, w, ctx, args, c);
 		verify(mock).process(
 				eq(42L),
 				(AggregateProcessWindowFunctionMock.Context) anyObject(),
@@ -528,8 +546,9 @@ public class InternalWindowFunctionTest {
 		List<Long> args = new LinkedList<>();
 		args.add(23L);
 		args.add(24L);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(((byte)0), w, args, c);
+		windowFunction.process(((byte)0), w, ctx, args, c);
 		verify(mock).process(
 				(AggregateProcessAllWindowFunctionMock.Context) anyObject(),
 				(Iterable) argThat(containsInAnyOrder(allOf(
@@ -552,7 +571,9 @@ public class InternalWindowFunctionTest {
 		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
 
 		@Override
-		public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
+		public void process(Long aLong, ProcessWindowFunction<Long, String, Long, TimeWindow>.Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
+
+		}
 	}
 
 	public static class AggregateProcessWindowFunctionMock
@@ -565,7 +586,9 @@ public class InternalWindowFunctionTest {
 		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
 
 		@Override
-		public void process(Long aLong, Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
+		public void process(Long aLong, ProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>.Context context, Iterable<Map<Long, Long>> elements, Collector<String> out) throws Exception {
+
+		}
 	}
 
 	public static class AggregateProcessAllWindowFunctionMock

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
index 11508c5..ff1cbdf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
@@ -118,9 +118,9 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
 
 		testHarness.processElement(new StreamRecord<>(1, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -182,9 +182,9 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
 
 		testHarness.processElement(new StreamRecord<>(1, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 8aae46a..faab505 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -163,6 +163,10 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		return Mockito.any();
 	}
 
+	static InternalWindowFunction.InternalWindowContext anyInternalWindowContext() {
+		return Mockito.any();
+	}
+
 	static Trigger.OnMergeContext anyOnMergeContext() {
 		return Mockito.any();
 	}
@@ -408,9 +412,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq((new TimeWindow(0, 2))), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 	}
 
 	@Test
@@ -455,9 +459,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 1)), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 1)), anyInternalWindowContext(), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector());
 	}
 
 	@Test
@@ -509,16 +513,16 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Exception {
 				@SuppressWarnings("unchecked")
-				Collector<String> out = invocation.getArgumentAt(3, Collector.class);
+				Collector<String> out = invocation.getArgumentAt(4, Collector.class);
 				out.collect("Hallo");
 				out.collect("Ciao");
 				return null;
 			}
-		}).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+		}).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		assertThat(testHarness.extractOutputStreamRecords(),
 				contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
@@ -553,25 +557,25 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Exception {
 				@SuppressWarnings("unchecked")
-				Collector<String> out = invocation.getArgumentAt(3, Collector.class);
+				Collector<String> out = invocation.getArgumentAt(4, Collector.class);
 				out.collect("Hallo");
 				out.collect("Ciao");
 				return null;
 			}
-		}).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+		}).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1);
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, never()).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector());
+		verify(mockWindowFunction, never()).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector());
 		assertTrue(testHarness.extractOutputStreamRecords().isEmpty());
 
 		timeAdaptor.shouldFireOnTime(mockTrigger);
 
 		timeAdaptor.advanceTime(testHarness, 1L);
 
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		assertThat(testHarness.extractOutputStreamRecords(),
 				contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
@@ -650,9 +654,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -693,9 +697,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -858,9 +862,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		timeAdaptor.advanceTime(testHarness, 0L);
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -919,9 +923,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		timeAdaptor.advanceTime(testHarness, 0L);
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -1050,7 +1054,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null);
 
 		verify(mockWindowFunction, never())
-				.apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+				.process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
 
 		assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left
 	}
@@ -1114,7 +1118,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null);
 
 		verify(mockWindowFunction, never())
-				.apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+				.process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
 
 		assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left
 	}
@@ -1186,7 +1190,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null);
 
 		verify(mockWindowFunction, never())
-				.apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+				.process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
 
 		// now we trigger the dangling timer
 		timeAdaptor.advanceTime(testHarness, 10L);
@@ -2208,7 +2212,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -2326,9 +2330,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		verify(mockTrigger, times(2)).clear(anyTimeWindow(), anyTriggerContext());
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// it's also called for the cleanup timers
 		verify(mockTrigger, times(4)).onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext());
@@ -2339,6 +2343,96 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		assertEquals(0, testHarness.numEventTimeTimers());
 	}
 
+	@Test
+	public void testPerWindowStateSetAndClearedOnEventTimePurge() throws Exception {
+		testPerWindowStateSetAndClearedOnPurge(new EventTimeAdaptor());
+	}
+
+	@Test
+	public void testPerWindowStateSetAndClearedOnProcessingTimePurge() throws Exception {
+		testPerWindowStateSetAndClearedOnPurge(new ProcessingTimeAdaptor());
+	}
+
+	public void testPerWindowStateSetAndClearedOnPurge(TimeDomainAdaptor timeAdaptor) throws Exception {
+		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+		timeAdaptor.setIsEventTime(mockAssigner);
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+			createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+
+		testHarness.open();
+
+		when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext()))
+			.thenReturn(TriggerResult.FIRE);
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+			.thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+		doAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+				context.windowState().getState(valueStateDescriptor).update("hello");
+				return null;
+			}
+		}).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+
+		doAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
+				context.windowState().getState(valueStateDescriptor).clear();
+				return null;
+			}
+		}).when(mockWindowFunction).clear(anyTimeWindow(), anyInternalWindowContext());
+
+		assertEquals(0, testHarness.getOutput().size());
+		assertEquals(0, testHarness.numKeyedStateEntries());
+
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+
+		assertEquals(2, testHarness.numKeyedStateEntries()); // window contents plus value state
+		assertEquals(1, timeAdaptor.numTimers(testHarness)); // gc timers
+
+		timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window
+
+		assertEquals(0, testHarness.numKeyedStateEntries());
+		assertEquals(0, timeAdaptor.numTimers(testHarness));
+	}
+
+	@Test
+	public void testWindowStateNotAvailableToMergingWindows() throws Exception {
+		WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+			createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+
+		testHarness.open();
+
+		when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext()))
+			.thenReturn(TriggerResult.FIRE);
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+			.thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+		doAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+				context.windowState().getState(valueStateDescriptor).update("hello");
+				return null;
+			}
+		}).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+
+		expectedException.expect(UnsupportedOperationException.class);
+		expectedException.expectMessage("Per-window state is not allowed when using merging windows.");
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+	}
+
 	protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
 			WindowAssigner<Integer, W> assigner,
 			Trigger<Integer, W> trigger,

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
index 163117b..2f0e48e 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -22,6 +22,7 @@ import java.io.Serializable
 
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.state.KeyedStateStore
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -47,6 +48,15 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function w
   def process(context: Context, elements: Iterable[IN], out: Collector[OUT])
 
   /**
+    * Deletes any state in the [[Context]] when the Window is purged.
+    *
+    * @param context The context to which the window is being evaluated
+    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+    */
+  @throws[Exception]
+  def clear(context: Context) {}
+
+  /**
     * The context holding window metadata
     */
   abstract class Context {
@@ -54,6 +64,16 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function w
       * @return The window that is being evaluated.
       */
     def window: W
+
+    /**
+      * State accessor for per-key and per-window state.
+      */
+    def windowState: KeyedStateStore
+
+    /**
+      * State accessor for per-key global state.
+      */
+    def globalState: KeyedStateStore
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index 79f3918..bdf6ae6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -22,6 +22,7 @@ import java.io.Serializable
 
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.state.KeyedStateStore
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -49,6 +50,15 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
   def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT])
 
   /**
+    * Deletes any state in the [[Context]] when the Window is purged.
+    *
+    * @param context The context to which the window is being evaluated
+    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+    */
+  @throws[Exception]
+  def clear(context: Context) {}
+
+  /**
     * The context holding window metadata
     */
   abstract class Context {
@@ -56,6 +66,16 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
       * @return The window that is being evaluated.
       */
     def window: W
+
+    /**
+      * State accessor for per-key and per-window state.
+      */
+    def windowState: KeyedStateStore
+
+    /**
+      * State accessor for per-key global state.
+      */
+    def globalState: KeyedStateStore
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index a4fec64..fac5958 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -52,10 +52,25 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
       out: Collector[OUT]): Unit = {
     val ctx = new func.Context {
       override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
     }
     func.process(key, ctx, elements.asScala, out)
   }
 
+  override def clear(context: JProcessWindowFunction[IN, OUT, KEY, W]#Context): Unit = {
+    val ctx = new func.Context {
+      override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
+    }
+    func.clear(ctx)
+  }
+
   override def setRuntimeContext(t: RuntimeContext): Unit = {
     super.setRuntimeContext(t)
     func match {
@@ -99,10 +114,26 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
       out: Collector[OUT]): Unit = {
     val ctx = new func.Context {
       override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
     }
     func.process(ctx, elements.asScala, out)
   }
 
+  override def clear(context: JProcessAllWindowFunction[IN, OUT, W]#Context): Unit = {
+    val ctx = new func.Context {
+      override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
+    }
+    func.clear(ctx)
+  }
+
+
   override def setRuntimeContext(t: RuntimeContext): Unit = {
     super.setRuntimeContext(t)
     func match {


[2/3] flink git commit: [FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction

Posted by al...@apache.org.
[FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fad201bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fad201bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fad201bf

Branch: refs/heads/master
Commit: fad201bfb0b1f2757f68f7b3ffaf97a486eb93e8
Parents: 5c37e55
Author: Seth Wiesman <sw...@mediamath.com>
Authored: Sun Mar 5 23:07:18 2017 -0500
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 25 16:59:17 2017 +0100

----------------------------------------------------------------------
 .../FoldApplyProcessAllWindowFunction.java      |  23 +-
 .../FoldApplyProcessWindowFunction.java         |  23 +-
 .../InternalProcessApplyAllWindowContext.java   |  57 +++++
 .../InternalProcessApplyWindowContext.java      |  58 +++++
 .../windowing/ProcessAllWindowFunction.java     |  22 ++
 .../windowing/ProcessWindowFunction.java        |  24 +-
 .../ReduceApplyProcessAllWindowFunction.java    |  23 +-
 .../ReduceApplyProcessWindowFunction.java       |  21 +-
 .../windowing/AccumulatingKeyedTimePanes.java   |  75 ++++++-
 .../windowing/EvictingWindowOperator.java       |  62 +++---
 .../operators/windowing/WindowOperator.java     | 220 +++++++++++++++----
 ...ternalAggregateProcessAllWindowFunction.java |  28 ++-
 .../InternalAggregateProcessWindowFunction.java |  28 ++-
 .../InternalIterableAllWindowFunction.java      |   7 +-
 ...nternalIterableProcessAllWindowFunction.java |  31 ++-
 .../InternalIterableProcessWindowFunction.java  |  24 +-
 .../InternalIterableWindowFunction.java         |   7 +-
 .../InternalProcessAllWindowContext.java        |  57 +++++
 .../functions/InternalProcessWindowContext.java |  58 +++++
 .../InternalSingleValueAllWindowFunction.java   |   7 +-
 ...rnalSingleValueProcessAllWindowFunction.java |  29 ++-
 ...nternalSingleValueProcessWindowFunction.java |  24 +-
 .../InternalSingleValueWindowFunction.java      |   7 +-
 .../functions/InternalWindowFunction.java       |  26 ++-
 .../FoldApplyProcessWindowFunctionTest.java     |  82 ++++++-
 .../functions/InternalWindowFunctionTest.java   |  49 +++--
 .../RegularWindowOperatorContractTest.java      |  12 +-
 .../windowing/WindowOperatorContractTest.java   | 158 ++++++++++---
 .../function/ProcessAllWindowFunction.scala     |  20 ++
 .../scala/function/ProcessWindowFunction.scala  |  20 ++
 .../ScalaProcessWindowFunctionWrapper.scala     |  31 +++
 31 files changed, 1091 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 5ac6766..8e8e52c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -50,6 +50,7 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
 	private TypeSerializer<ACC> accSerializer;
 	private final TypeInformation<ACC> accTypeInformation;
 	private transient ACC initialValue;
+	private transient InternalProcessApplyAllWindowContext<ACC, R, W> ctx;
 
 	public FoldApplyProcessAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
 		this.windowFunction = windowFunction;
@@ -70,6 +71,9 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
 		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
 		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
 		initialValue = accSerializer.deserialize(in);
+
+		ctx = new InternalProcessApplyAllWindowContext<>(windowFunction);
+
 	}
 
 	@Override
@@ -92,12 +96,19 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
 			result = foldFunction.fold(result, val);
 		}
 
-		windowFunction.process(windowFunction.new Context() {
-			@Override
-			public W window() {
-				return context.window();
-			}
-		}, Collections.singletonList(result), out);
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+
+		windowFunction.process(ctx, Collections.singletonList(result), out);
+	}
+
+	@Override
+	public void clear(final Context context) throws Exception {
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+		windowFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index e1bc759..073a2f3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -50,6 +50,7 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
 	private TypeSerializer<ACC> accSerializer;
 	private final TypeInformation<ACC> accTypeInformation;
 	private transient ACC initialValue;
+	private transient InternalProcessApplyWindowContext<ACC, R, K, W> ctx;
 
 	public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
 		this.windowFunction = windowFunction;
@@ -70,6 +71,8 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
 		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
 		DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
 		initialValue = accSerializer.deserialize(in);
+
+		ctx = new InternalProcessApplyWindowContext<>(windowFunction);
 	}
 
 	@Override
@@ -85,19 +88,25 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
 	}
 
 	@Override
-	public void process(K key, final Context context, Iterable<T> values, Collector<R> out) throws Exception {
+	public void process(K key, Context context, Iterable<T> values, Collector<R> out) throws Exception {
 		ACC result = accSerializer.copy(initialValue);
 
 		for (T val : values) {
 			result = foldFunction.fold(result, val);
 		}
 
-		windowFunction.process(key, windowFunction.new Context() {
-			@Override
-			public W window() {
-				return context.window();
-			}
-		}, Collections.singletonList(result), out);
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+		windowFunction.process(key, ctx, Collections.singletonList(result), out);
+	}
+
+	@Override
+	public void clear(final Context context) throws Exception{
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+		windowFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
new file mode 100644
index 0000000..e1a0a98
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessApplyAllWindowContext<IN, OUT, W extends Window>
+	extends ProcessAllWindowFunction<IN, OUT, W>.Context {
+
+	W window;
+	KeyedStateStore windowState;
+	KeyedStateStore globalState;
+
+	InternalProcessApplyAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) {
+		function.super();
+	}
+
+	@Override
+	public W window() {
+		return window;
+	}
+
+	@Override
+	public KeyedStateStore windowState() {
+		return windowState;
+	}
+
+	@Override
+	public KeyedStateStore globalState() {
+		return globalState;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
new file mode 100644
index 0000000..f547adc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the window key. 
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessApplyWindowContext<IN, OUT, KEY, W extends Window>
+	extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {
+
+	W window;
+	KeyedStateStore windowState;
+	KeyedStateStore globalState;
+
+	InternalProcessApplyWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) {
+		function.super();
+	}
+
+	@Override
+	public W window() {
+		return window;
+	}
+
+	@Override
+	public KeyedStateStore windowState() {
+		return windowState;
+	}
+
+	@Override
+	public KeyedStateStore globalState() {
+		return globalState;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
index 622e020..f49aa27 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -48,6 +49,14 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implem
 	public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
 
 	/**
+	 * Deletes any state in the {@code Context} when the Window is purged.
+	 *
+	 * @param context The context to which the window is being evaluated
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+	 */
+	public void clear(Context context) throws Exception {}
+
+	/**
 	 * The context holding window metadata
 	 */
 	public abstract class Context {
@@ -55,5 +64,18 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implem
 		 * @return The window that is being evaluated.
 		 */
 		public abstract W window();
+
+		/**
+		 * State accessor for per-key and per-window state.
+		 *
+		 * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
+		 * by implementing {@link ProcessWindowFunction#clear(ProcessWindowFunction.Context)}.
+		 */
+		public abstract KeyedStateStore windowState();
+
+		/**
+		 * State accessor for per-key global state.
+		 */
+		public abstract KeyedStateStore globalState();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 9c48e24..bcefaf7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -50,12 +51,33 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> impl
 	public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
 
 	/**
+	 * Deletes any state in the {@code Context} when the Window is purged.
+	 *
+	 * @param context The context to which the window is being evaluated
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+	 */
+	public void clear(Context context) throws Exception {}
+
+	/**
 	 * The context holding window metadata
 	 */
-	public abstract class Context {
+	public abstract class Context implements java.io.Serializable {
 		/**
 		 * @return The window that is being evaluated.
 		 */
 		public abstract W window();
+
+		/**
+		 * State accessor for per-key and per-window state.
+		 *
+		 * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
+		 * by implementing {@link ProcessWindowFunction#clear(Context)}.
+		 */
+		public abstract KeyedStateStore windowState();
+
+		/**
+		 * State accessor for per-key global state.
+		 */
+		public abstract KeyedStateStore globalState();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
index 142c71e..4c54c94 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -35,6 +35,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
 
 	private final ReduceFunction<T> reduceFunction;
 	private final ProcessAllWindowFunction<T, R, W> windowFunction;
+	private transient InternalProcessApplyAllWindowContext<T, R, W> ctx;
 
 	public ReduceApplyProcessAllWindowFunction(ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> windowFunction) {
 		this.windowFunction = windowFunction;
@@ -52,17 +53,27 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
 				curr = reduceFunction.reduce(curr, val);
 			}
 		}
-		windowFunction.process(windowFunction.new Context() {
-			@Override
-			public W window() {
-				return context.window();
-			}
-		}, Collections.singletonList(curr), out);
+
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+
+		windowFunction.process(ctx, Collections.singletonList(curr), out);
+	}
+
+	@Override
+	public void clear(final Context context) throws Exception {
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+
+		windowFunction.clear(ctx);
 	}
 
 	@Override
 	public void open(Configuration configuration) throws Exception {
 		FunctionUtils.openFunction(this.windowFunction, configuration);
+		ctx = new InternalProcessApplyAllWindowContext<>(windowFunction);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
index 9ea1fdf..1af783a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -35,6 +35,7 @@ public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
 
 	private final ReduceFunction<T> reduceFunction;
 	private final ProcessWindowFunction<T, R, K, W> windowFunction;
+	private transient InternalProcessApplyWindowContext<T, R, K, W> ctx;
 
 	public ReduceApplyProcessWindowFunction(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> windowFunction) {
 		this.windowFunction = windowFunction;
@@ -52,17 +53,25 @@ public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
 				curr = reduceFunction.reduce(curr, val);
 			}
 		}
-		windowFunction.process(k, windowFunction.new Context() {
-			@Override
-			public W window() {
-				return context.window();
-			}
-		}, Collections.singletonList(curr), out);
+
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+		windowFunction.process(k, ctx, Collections.singletonList(curr), out);
+	}
+
+	@Override
+	public void clear(final Context context) throws Exception {
+		this.ctx.window = context.window();
+		this.ctx.windowState = context.windowState();
+		this.ctx.globalState = context.globalState();
+		windowFunction.clear(ctx);
 	}
 
 	@Override
 	public void open(Configuration configuration) throws Exception {
 		FunctionUtils.openFunction(this.windowFunction, configuration);
+		ctx = new InternalProcessApplyWindowContext<>(windowFunction);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 87c5aca..d58b5cc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -19,6 +19,17 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.util.UnionIterator;
@@ -38,6 +49,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 	private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
 
+	private final AccumulatingKeyedTimePanesContext context;
+
 	/**
 	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
 	private long evaluationPass = 1L;   
@@ -47,6 +60,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
 		this.keySelector = keySelector;
 		this.function = function;
+		this.context = new AccumulatingKeyedTimePanesContext();
 	}
 
 	// ------------------------------------------------------------------------
@@ -67,13 +81,15 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 			for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
 				Key key = entry.getKey();
 				operator.setCurrentKey(key);
-				function.apply(entry.getKey(), window, entry.getValue(), out);
+				context.globalState = operator.getKeyedStateStore();
+
+				function.process(entry.getKey(), window, context, entry.getValue(), out);
 			}
 		}
 		else {
 			// general code path for multi-pane case
 			WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(
-					function, window, out, operator);
+					function, window, out, operator, context);
 			traverseAllPanes(evaluator, evaluationPass);
 		}
 		
@@ -95,17 +111,19 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		private final TimeWindow window;
 		
 		private final AbstractStreamOperator<Result> contextOperator;
-		
+
 		private Key currentKey;
 		
+		private AccumulatingKeyedTimePanesContext context;
 
 		WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
-								Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
+								Collector<Result> out, AbstractStreamOperator<Result> contextOperator, AccumulatingKeyedTimePanesContext context) {
 			this.function = function;
 			this.out = out;
 			this.unionIterator = new UnionIterator<>();
 			this.window = window;
 			this.contextOperator = contextOperator;
+			this.context = context;
 		}
 
 
@@ -123,7 +141,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		@Override
 		public void keyDone() throws Exception {
 			contextOperator.setCurrentKey(currentKey);
-			function.apply(currentKey, window, unionIterator, out);
+			context.globalState = contextOperator.getKeyedStateStore();
+			function.process(currentKey, window, context, unionIterator, out);
 		}
 	}
 	
@@ -136,6 +155,52 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
 	}
 
+	private static class ThrowingKeyedStateStore implements KeyedStateStore {
+		@Override
+		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+		}
+
+		@Override
+		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+		}
+
+		@Override
+		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+		}
+
+		@Override
+		public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+		}
+
+		@Override
+		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+		}
+	}
+
+	private static class AccumulatingKeyedTimePanesContext implements InternalWindowFunction.InternalWindowContext {
+		KeyedStateStore globalState;
+		KeyedStateStore throwingStore;
+
+		public AccumulatingKeyedTimePanesContext() {
+			this.throwingStore = new ThrowingKeyedStateStore();
+		}
+
+		@Override
+		public KeyedStateStore windowState() {
+			return throwingStore;
+		}
+
+		@Override
+		public KeyedStateStore globalState() {
+			return globalState;
+		}
+	}
+
 	private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 24c8d32..85451a5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -134,14 +134,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 											" window: " + mergeResult);
 								}
 
-								context.key = key;
-								context.window = mergeResult;
+								triggerContext.key = key;
+								triggerContext.window = mergeResult;
 
-								context.onMerge(mergedWindows);
+								triggerContext.onMerge(mergedWindows);
 
 								for (W m : mergedWindows) {
-									context.window = m;
-									context.clear();
+									triggerContext.window = m;
+									triggerContext.clear();
 									deleteCleanupTimer(m);
 								}
 
@@ -165,12 +165,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 				evictingWindowState.setCurrentNamespace(stateWindow);
 				evictingWindowState.add(element);
 
-				context.key = key;
-				context.window = actualWindow;
+				triggerContext.key = key;
+				triggerContext.window = actualWindow;
 				evictorContext.key = key;
 				evictorContext.window = actualWindow;
 
-				TriggerResult triggerResult = context.onElement(element);
+				TriggerResult triggerResult = triggerContext.onElement(element);
 
 				if (triggerResult.isFire()) {
 					Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
@@ -201,12 +201,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 				evictingWindowState.setCurrentNamespace(window);
 				evictingWindowState.add(element);
 
-				context.key = key;
-				context.window = window;
+				triggerContext.key = key;
+				triggerContext.window = window;
 				evictorContext.key = key;
 				evictorContext.window = window;
 
-				TriggerResult triggerResult = context.onElement(element);
+				TriggerResult triggerResult = triggerContext.onElement(element);
 
 				if (triggerResult.isFire()) {
 					Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
@@ -236,8 +236,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 	@Override
 	public void onEventTime(InternalTimer<K, W> timer) throws Exception {
 
-		context.key = timer.getKey();
-		context.window = timer.getNamespace();
+		triggerContext.key = timer.getKey();
+		triggerContext.window = timer.getNamespace();
 		evictorContext.key = timer.getKey();
 		evictorContext.window = timer.getNamespace();
 
@@ -245,7 +245,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
-			W stateWindow = mergingWindows.getStateWindow(context.window);
+			W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
 			if (stateWindow == null) {
 				// Timer firing for non-existent window, this can only happen if a
 				// trigger did not clean up timers. We have already cleared the merging
@@ -255,23 +255,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 				evictingWindowState.setCurrentNamespace(stateWindow);
 			}
 		} else {
-			evictingWindowState.setCurrentNamespace(context.window);
+			evictingWindowState.setCurrentNamespace(triggerContext.window);
 		}
 
 		Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
 
 		if (contents != null) {
-			TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+			TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
 			if (triggerResult.isFire()) {
-				emitWindowContents(context.window, contents, evictingWindowState);
+				emitWindowContents(triggerContext.window, contents, evictingWindowState);
 			}
 			if (triggerResult.isPurge()) {
 				evictingWindowState.clear();
 			}
 		}
 
-		if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
-			clearAllState(context.window, evictingWindowState, mergingWindows);
+		if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+			clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
 		}
 
 		if (mergingWindows != null) {
@@ -282,8 +282,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 
 	@Override
 	public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
-		context.key = timer.getKey();
-		context.window = timer.getNamespace();
+		triggerContext.key = timer.getKey();
+		triggerContext.window = timer.getNamespace();
 		evictorContext.key = timer.getKey();
 		evictorContext.window = timer.getNamespace();
 
@@ -291,7 +291,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
-			W stateWindow = mergingWindows.getStateWindow(context.window);
+			W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
 			if (stateWindow == null) {
 				// Timer firing for non-existent window, this can only happen if a
 				// trigger did not clean up timers. We have already cleared the merging
@@ -301,23 +301,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 				evictingWindowState.setCurrentNamespace(stateWindow);
 			}
 		} else {
-			evictingWindowState.setCurrentNamespace(context.window);
+			evictingWindowState.setCurrentNamespace(triggerContext.window);
 		}
 
 		Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
 
 		if (contents != null) {
-			TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+			TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
 			if (triggerResult.isFire()) {
-				emitWindowContents(context.window, contents, evictingWindowState);
+				emitWindowContents(triggerContext.window, contents, evictingWindowState);
 			}
 			if (triggerResult.isPurge()) {
 				evictingWindowState.clear();
 			}
 		}
 
-		if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
-			clearAllState(context.window, evictingWindowState, mergingWindows);
+		if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+			clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
 		}
 
 		if (mergingWindows != null) {
@@ -348,7 +348,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 				}
 			});
 
-		userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
+		processContext.window = triggerContext.window;
+		userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
 		evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
 
 
@@ -364,9 +365,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
 			W window,
 			ListState<StreamRecord<IN>> windowState,
 			MergingWindowSet<W> mergingWindows) throws Exception {
-
 		windowState.clear();
-		context.clear();
+		triggerContext.clear();
+		processContext.window = window;
+		processContext.clear();
 		if (mergingWindows != null) {
 			mergingWindows.retireWindow(window);
 			mergingWindows.persist();

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 3745659..3d40716 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -23,8 +23,16 @@ import org.apache.commons.math3.util.ArithmeticUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -159,7 +167,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	/** This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
-	protected transient Context context = new Context(null, null);
+	protected transient Context triggerContext = new Context(null, null);
+
+	protected transient WindowContext processContext = new WindowContext(null);
 
 	protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
 
@@ -264,7 +274,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		internalTimerService =
 				getInternalTimerService("window-timers", windowSerializer, this);
 
-		context = new Context(null, null);
+		triggerContext = new Context(null, null);
+		processContext = new WindowContext( null);
 
 		windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
 			@Override
@@ -317,7 +328,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	public void close() throws Exception {
 		super.close();
 		timestampedCollector = null;
-		context = null;
+		triggerContext = null;
+		processContext = null;
 		windowAssignerContext = null;
 	}
 
@@ -325,7 +337,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	public void dispose() throws Exception {
 		super.dispose();
 		timestampedCollector = null;
-		context = null;
+		triggerContext = null;
+		processContext = null;
 		windowAssignerContext = null;
 	}
 
@@ -365,14 +378,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 									" window: " + mergeResult);
 						}
 
-						context.key = key;
-						context.window = mergeResult;
+						triggerContext.key = key;
+						triggerContext.window = mergeResult;
 
-						context.onMerge(mergedWindows);
+						triggerContext.onMerge(mergedWindows);
 
 						for (W m: mergedWindows) {
-							context.window = m;
-							context.clear();
+							triggerContext.window = m;
+							triggerContext.clear();
 							deleteCleanupTimer(m);
 						}
 
@@ -396,10 +409,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				windowState.setCurrentNamespace(stateWindow);
 				windowState.add(element.getValue());
 
-				context.key = key;
-				context.window = actualWindow;
+				triggerContext.key = key;
+				triggerContext.window = actualWindow;
 
-				TriggerResult triggerResult = context.onElement(element);
+				TriggerResult triggerResult = triggerContext.onElement(element);
 
 				if (triggerResult.isFire()) {
 					ACC contents = windowState.get();
@@ -429,10 +442,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				windowState.setCurrentNamespace(window);
 				windowState.add(element.getValue());
 
-				context.key = key;
-				context.window = window;
+				triggerContext.key = key;
+				triggerContext.window = window;
 
-				TriggerResult triggerResult = context.onElement(element);
+				TriggerResult triggerResult = triggerContext.onElement(element);
 
 				if (triggerResult.isFire()) {
 					ACC contents = windowState.get();
@@ -460,14 +473,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	@Override
 	public void onEventTime(InternalTimer<K, W> timer) throws Exception {
-		context.key = timer.getKey();
-		context.window = timer.getNamespace();
+		triggerContext.key = timer.getKey();
+		triggerContext.window = timer.getNamespace();
 
 		MergingWindowSet<W> mergingWindows;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
-			W stateWindow = mergingWindows.getStateWindow(context.window);
+			W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
 			if (stateWindow == null) {
 				// Timer firing for non-existent window, this can only happen if a
 				// trigger did not clean up timers. We have already cleared the merging
@@ -477,7 +490,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				windowState.setCurrentNamespace(stateWindow);
 			}
 		} else {
-			windowState.setCurrentNamespace(context.window);
+			windowState.setCurrentNamespace(triggerContext.window);
 			mergingWindows = null;
 		}
 
@@ -487,17 +500,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		if (contents != null) {
-			TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+			TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
 			if (triggerResult.isFire()) {
-				emitWindowContents(context.window, contents);
+				emitWindowContents(triggerContext.window, contents);
 			}
 			if (triggerResult.isPurge()) {
 				windowState.clear();
 			}
 		}
 
-		if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
-			clearAllState(context.window, windowState, mergingWindows);
+		if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+			clearAllState(triggerContext.window, windowState, mergingWindows);
 		}
 
 		if (mergingWindows != null) {
@@ -508,14 +521,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	@Override
 	public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
-		context.key = timer.getKey();
-		context.window = timer.getNamespace();
+		triggerContext.key = timer.getKey();
+		triggerContext.window = timer.getNamespace();
 
 		MergingWindowSet<W> mergingWindows;
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindows = getMergingWindowSet();
-			W stateWindow = mergingWindows.getStateWindow(context.window);
+			W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
 			if (stateWindow == null) {
 				// Timer firing for non-existent window, this can only happen if a
 				// trigger did not clean up timers. We have already cleared the merging
@@ -525,7 +538,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				windowState.setCurrentNamespace(stateWindow);
 			}
 		} else {
-			windowState.setCurrentNamespace(context.window);
+			windowState.setCurrentNamespace(triggerContext.window);
 			mergingWindows = null;
 		}
 
@@ -535,17 +548,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		if (contents != null) {
-			TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+			TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
 			if (triggerResult.isFire()) {
-				emitWindowContents(context.window, contents);
+				emitWindowContents(triggerContext.window, contents);
 			}
 			if (triggerResult.isPurge()) {
 				windowState.clear();
 			}
 		}
 
-		if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
-			clearAllState(context.window, windowState, mergingWindows);
+		if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+			clearAllState(triggerContext.window, windowState, mergingWindows);
 		}
 
 		if (mergingWindows != null) {
@@ -559,14 +572,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * {@link Trigger#clear(Window, Trigger.TriggerContext)}.
 	 *
 	 * <p>The caller must ensure that the
-	 * correct key is set in the state backend and the context object.
+	 * correct key is set in the state backend and the triggerContext object.
 	 */
 	private void clearAllState(
 			W window,
 			AppendingState<IN, ACC> windowState,
 			MergingWindowSet<W> mergingWindows) throws Exception {
 		windowState.clear();
-		context.clear();
+		triggerContext.clear();
+		processContext.window = window;
+		processContext.clear();
 		if (mergingWindows != null) {
 			mergingWindows.retireWindow(window);
 			mergingWindows.persist();
@@ -579,7 +594,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	@SuppressWarnings("unchecked")
 	private void emitWindowContents(W window, ACC contents) throws Exception {
 		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
-		userFunction.apply(context.key, context.window, contents, timestampedCollector);
+		processContext.window = window;
+		userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
 	}
 
 	/**
@@ -636,9 +652,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		if (windowAssigner.isEventTime()) {
-			context.registerEventTimeTimer(cleanupTime);
+			triggerContext.registerEventTimeTimer(cleanupTime);
 		} else {
-			context.registerProcessingTimeTimer(cleanupTime);
+			triggerContext.registerProcessingTimeTimer(cleanupTime);
 		}
 	}
 
@@ -654,9 +670,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			return;
 		}
 		if (windowAssigner.isEventTime()) {
-			context.deleteEventTimeTimer(cleanupTime);
+			triggerContext.deleteEventTimeTimer(cleanupTime);
 		} else {
-			context.deleteProcessingTimeTimer(cleanupTime);
+			triggerContext.deleteProcessingTimeTimer(cleanupTime);
 		}
 	}
 
@@ -686,6 +702,134 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	/**
+	 * Base class for per-window {@link KeyedStateStore KeyedStateStores}. Used to allow per-window
+	 * state access for {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
+	 */
+	public abstract class AbstractPerWindowStateStore implements KeyedStateStore {
+
+		// we have this in the base class even though it's not used in MergingKeyStore so that
+		// we can always set it and ignore what actual implementation we have
+		protected W window;
+	}
+
+	/**
+	 * Special {@link AbstractPerWindowStateStore} that doesn't allow access to per-window state.
+	 */
+	public class MergingWindowStateStore extends AbstractPerWindowStateStore {
+		@Override
+		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+		}
+
+		@Override
+		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+		}
+
+		@Override
+		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+		}
+
+		@Override
+		public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+		}
+
+		@Override
+		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+			throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+		}
+	}
+
+	/**
+	 * Regular per-window state store for use with
+	 * {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
+	 */
+	public class PerWindowStateStore extends AbstractPerWindowStateStore {
+		@Override
+		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+			try {
+				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+			} catch (Exception e) {
+				throw new RuntimeException("Could not retrieve state", e);
+			}
+		}
+
+		@Override
+		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+			try {
+				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+			} catch (Exception e) {
+				throw new RuntimeException("Could not retrieve state", e);
+			}
+		}
+
+		@Override
+		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+			try {
+				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+			} catch (Exception e) {
+				throw new RuntimeException("Could not retrieve state", e);
+			}
+		}
+
+		@Override
+		public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+			try {
+				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+			} catch (Exception e) {
+				throw new RuntimeException("Could not retrieve state", e);
+			}
+		}
+
+		@Override
+		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+			try {
+				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+			} catch (Exception e) {
+				throw new RuntimeException("Could not retrieve state", e);
+			}
+		}
+	}
+
+	/**
+	 * A utility class for handling {@code ProcessWindowFunction} invocations. This can be reused
+	 * by setting the {@code key} and {@code window} fields. No internal state must be kept in the
+	 * {@code WindowContext}.
+	 */
+	public class WindowContext implements InternalWindowFunction.InternalWindowContext {
+		protected W window;
+
+		protected AbstractPerWindowStateStore windowState;
+
+		public WindowContext(W window) {
+			this.window = window;
+			this.windowState = windowAssigner instanceof MergingWindowAssigner ?  new MergingWindowStateStore() : new PerWindowStateStore();
+		}
+
+		@Override
+		public String toString() {
+			return "WindowContext{Window = " + window.toString() + "}";
+		}
+
+		public void clear() throws Exception {
+			userFunction.clear(window, this);
+		}
+
+		@Override
+		public KeyedStateStore windowState() {
+			this.windowState.window = this.window;
+			return this.windowState;
+		}
+
+		@Override
+		public KeyedStateStore globalState() {
+			return WindowOperator.this.getKeyedStateStore();
+		}
+	}
+
+	/**
 	 * {@code Context} is a utility for handling {@code Trigger} invocations. It can be reused
 	 * by setting the {@code key} and {@code window} fields. No internal state must be kept in
 	 * the {@code Context}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
index 9533c95..83e896d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -45,6 +46,8 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
 
 	private final AggregateFunction<T, ACC, V> aggFunction;
 
+	private transient InternalProcessAllWindowContext<V, R, W> ctx;
+
 	public InternalAggregateProcessAllWindowFunction(
 			AggregateFunction<T, ACC, V> aggFunction,
 			ProcessAllWindowFunction<V, R, W> windowFunction) {
@@ -53,22 +56,31 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
 	}
 
 	@Override
-	public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
 		ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
-		ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
+		this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+	}
 
+	@Override
+	public void process(Byte key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
 		final ACC acc = aggFunction.createAccumulator();
 
 		for (T val : input) {
 			aggFunction.add(val, acc);
 		}
 
-		wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out);
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out);
+	}
+
+	@Override
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
index 433da9b..e14c9bd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
@@ -46,30 +46,36 @@ public final class InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W ext
 
 	private final AggregateFunction<T, ACC, V> aggFunction;
 
+	private final InternalProcessWindowContext<V, R, K, W> ctx;
+
 	public InternalAggregateProcessWindowFunction(
 			AggregateFunction<T, ACC, V> aggFunction,
 			ProcessWindowFunction<V, R, K, W> windowFunction) {
 		super(windowFunction);
 		this.aggFunction = aggFunction;
+		this.ctx = new InternalProcessWindowContext<>(windowFunction);
 	}
-	
-	@Override
-	public void apply(K key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
-		ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
-		ProcessWindowFunction<V, R, K, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
 
+	@Override
+	public void process(K key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
 		final ACC acc = aggFunction.createAccumulator();
 
 		for (T val : input) {
 			aggFunction.add(val, acc);
 		}
 
-		wrappedFunction.process(key, context, Collections.singletonList(aggFunction.getResult(acc)), out);
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.process(key, ctx, Collections.singletonList(aggFunction.getResult(acc)), out);
+	}
+
+	@Override
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
index 672bdb6..f2507ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
@@ -39,11 +39,16 @@ public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void apply(Byte key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+	public void process(Byte key, W window, InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
 		wrappedFunction.apply(window, input, out);
 	}
 
 	@Override
+	public void clear(W window, InternalWindowContext context) throws Exception {
+
+	}
+
+	@Override
 	public RuntimeContext getRuntimeContext() {
 		throw new RuntimeException("This should never be called.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
index e33cc2a..47b7d55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -34,21 +35,33 @@ public final class InternalIterableProcessAllWindowFunction<IN, OUT, W extends W
 
 	private static final long serialVersionUID = 1L;
 
+	private transient InternalProcessAllWindowContext<IN, OUT, W> ctx;
+
 	public InternalIterableProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
 		super(wrappedFunction);
 	}
 
 	@Override
-	public void apply(Byte key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+		this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+	}
+
+	@Override
+	public void process(Byte key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.process(ctx, input, out);
+	}
+
+	@Override
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
 		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
-		ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
-
-		wrappedFunction.process(context, input, out);
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
index de516a5..7eb015e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
@@ -34,21 +34,27 @@ public final class InternalIterableProcessWindowFunction<IN, OUT, KEY, W extends
 
 	private static final long serialVersionUID = 1L;
 
+	private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx;
+
 	public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
 		super(wrappedFunction);
+		this.ctx = new InternalProcessWindowContext<>(wrappedFunction);
+	}
+
+	@Override
+	public void process(KEY key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.process(key, ctx, input, out);
 	}
 
 	@Override
-	public void apply(KEY key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
 		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
-		ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
-		
-		wrappedFunction.process(key, context, input, out);
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
index 895b31f..e2f1517 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -39,11 +39,16 @@ public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window
 	}
 
 	@Override
-	public void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+	public void process(KEY key, W window, InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
 		wrappedFunction.apply(key, window, input, out);
 	}
 
 	@Override
+	public void clear(W window, InternalWindowContext context) throws Exception {
+
+	}
+
+	@Override
 	public RuntimeContext getRuntimeContext() {
 		throw new RuntimeException("This should never be called.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
new file mode 100644
index 0000000..c70e161
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessAllWindowContext<IN, OUT, W extends Window>
+	extends ProcessAllWindowFunction<IN, OUT, W>.Context {
+
+	W window;
+	InternalWindowFunction.InternalWindowContext internalContext;
+
+	InternalProcessAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) {
+		function.super();
+	}
+
+	@Override
+	public W window() {
+		return window;
+	}
+
+	@Override
+	public KeyedStateStore windowState() {
+		return internalContext.windowState();
+	}
+
+	@Override
+	public KeyedStateStore globalState() {
+		return internalContext.globalState();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
new file mode 100644
index 0000000..0f1c0ee
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessWindowContext<IN, OUT, KEY, W extends Window>
+	extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {
+
+	W window;
+	InternalWindowFunction.InternalWindowContext internalContext;
+
+	InternalProcessWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) {
+		function.super();
+	}
+
+	@Override
+	public W window() {
+		return window;
+	}
+
+	@Override
+	public KeyedStateStore windowState() {
+		return internalContext.windowState();
+	}
+
+	@Override
+	public KeyedStateStore globalState() {
+		return internalContext.globalState();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
index a34d3ec..e90bcf4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
@@ -41,11 +41,16 @@ public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Windo
 	}
 
 	@Override
-	public void apply(Byte key, W window, IN input, Collector<OUT> out) throws Exception {
+	public void process(Byte key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
 		wrappedFunction.apply(window, Collections.singletonList(input), out);
 	}
 
 	@Override
+	public void clear(W window, InternalWindowContext context) throws Exception {
+
+	}
+
+	@Override
 	public RuntimeContext getRuntimeContext() {
 		throw new RuntimeException("This should never be called.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
index 0284ef7..f7c6a08 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -36,21 +37,33 @@ public final class InternalSingleValueProcessAllWindowFunction<IN, OUT, W extend
 
 	private static final long serialVersionUID = 1L;
 
+	private transient InternalProcessAllWindowContext<IN, OUT, W> ctx;
+
 	public InternalSingleValueProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
 		super(wrappedFunction);
 	}
 
 	@Override
-	public void apply(Byte key, final W window, IN input, Collector<OUT> out) throws Exception {
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+		this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+	}
+
+	@Override
+	public void process(Byte key, final W window, final InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
 		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
-		ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
+		wrappedFunction.process(ctx, Collections.singletonList(input), out);
+	}
 
-		wrappedFunction.process(context, Collections.singletonList(input), out);
+	@Override
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+		ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
index 7a4e8c6..21d1639 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -36,21 +36,29 @@ public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W exte
 
 	private static final long serialVersionUID = 1L;
 
+	private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx;
+
 	public InternalSingleValueProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
 		super(wrappedFunction);
+		ctx = new InternalProcessWindowContext<>(wrappedFunction);
 	}
 
 	@Override
-	public void apply(KEY key, final W window, IN input, Collector<OUT> out) throws Exception {
+	public void process(KEY key, final W window, final InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+
 		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
-		ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
-			@Override
-			public W window() {
-				return window;
-			}
-		};
+		wrappedFunction.process(key, ctx, Collections.singletonList(input), out);
+	}
 
-		wrappedFunction.process(key, context, Collections.singletonList(input), out);
+	@Override
+	public void clear(final W window, final InternalWindowContext context) throws Exception {
+		this.ctx.window = window;
+		this.ctx.internalContext = context;
+
+		ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+		wrappedFunction.clear(ctx);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
index 9a0a447..d5cc4a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
@@ -41,11 +41,16 @@ public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Win
 	}
 
 	@Override
-	public void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception {
+	public void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
 		wrappedFunction.apply(key, window, Collections.singletonList(input), out);
 	}
 
 	@Override
+	public void clear(W window, InternalWindowContext context) throws Exception {
+
+	}
+
+	@Override
 	public RuntimeContext getRuntimeContext() {
 		throw new RuntimeException("This should never be called.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index 2eb4052..9834480 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.operators.windowing.functions;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
@@ -29,15 +30,28 @@ import org.apache.flink.util.Collector;
  * @param <KEY> The type of the key.
  */
 public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> extends Function {
-
 	/**
 	 * Evaluates the window and outputs none or several elements.
 	 *
-	 * @param key    The key for which this window is evaluated.
-	 * @param window The window that is being evaluated.
-	 * @param input  The elements in the window being evaluated.
-	 * @param out    A collector for emitting elements.
+	 * @param context The context in which the window is being evaluated.
+	 * @param input The elements in the window being evaluated.
+	 * @param out A collector for emitting elements.
+	 *
 	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
 	 */
-	void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
+	void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception;
+
+	/**
+	 * Deletes any state in the {@code Context} when the Window is purged.
+	 *
+	 * @param context The context to which the window is being evaluated
+	 * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+	 */
+	void clear(W window, InternalWindowContext context) throws Exception;
+
+	interface InternalWindowContext extends java.io.Serializable {
+		KeyedStateStore windowState();
+
+		KeyedStateStore globalState();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index 4b479f3..c4bed37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -21,20 +21,28 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -45,8 +53,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProces
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
 import org.apache.flink.util.Collector;
-import org.junit.Test;
 import org.junit.Assert;
+import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -139,12 +147,26 @@ public class FoldApplyProcessWindowFunctionTest {
 
 		expected.add(initValue);
 
-		foldWindowFunction.process(0, foldWindowFunction.new Context() {
+		FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() {
 			@Override
 			public TimeWindow window() {
 				return new TimeWindow(0, 1);
 			}
-		}, input, new ListCollector<>(result));
+
+			@Override
+			public KeyedStateStore windowState() {
+				return new DummyKeyedStateStore();
+			}
+
+			@Override
+			public KeyedStateStore globalState() {
+				return new DummyKeyedStateStore();
+			}
+		};
+
+		foldWindowFunction.open(new Configuration());
+
+		foldWindowFunction.process(0, ctx, input, new ListCollector<>(result));
 
 		Assert.assertEquals(expected, result);
 	}
@@ -234,16 +256,58 @@ public class FoldApplyProcessWindowFunctionTest {
 
 		expected.add(initValue);
 
-		foldWindowFunction.process(foldWindowFunction.new Context() {
+		FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() {
 			@Override
 			public TimeWindow window() {
 				return new TimeWindow(0, 1);
 			}
-		}, input, new ListCollector<>(result));
+
+			@Override
+			public KeyedStateStore windowState() {
+				return new DummyKeyedStateStore();
+			}
+
+			@Override
+			public KeyedStateStore globalState() {
+				return new DummyKeyedStateStore();
+			}
+		};
+
+		foldWindowFunction.open(new Configuration());
+
+		foldWindowFunction.process(ctx, input, new ListCollector<>(result));
 
 		Assert.assertEquals(expected, result);
 	}
 
+	public static class DummyKeyedStateStore implements KeyedStateStore {
+
+		@Override
+		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+			return null;
+		}
+
+		@Override
+		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+			return null;
+		}
+
+		@Override
+		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+			return null;
+		}
+
+		@Override
+		public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+			return null;
+		}
+
+		@Override
+		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+			return null;
+		}
+	}
+
 	public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
 		public DummyStreamExecutionEnvironment() {


[3/3] flink git commit: [hotfix] Make GC test more strict in WindowOperatorContractTest

Posted by al...@apache.org.
[hotfix] Make GC test more strict in WindowOperatorContractTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/662ed33d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/662ed33d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/662ed33d

Branch: refs/heads/master
Commit: 662ed33d8f5baed95035b8176daf95a1caa0b278
Parents: fad201b
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Mar 25 16:59:31 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 25 16:59:31 2017 +0100

----------------------------------------------------------------------
 .../windowing/WindowOperatorContractTest.java   | 21 ++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/662ed33d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index faab505..3ae8f37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -1853,13 +1853,20 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 	}
 
 	private void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
+		long allowedLateness = 20L;
+
+		if (timeAdaptor instanceof ProcessingTimeAdaptor) {
+			// we don't have allowed lateness for processing time
+			allowedLateness = 0;
+		}
+
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);
 		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
 		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
 
 		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
-				createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+				createWindowOperator(mockAssigner, mockTrigger, allowedLateness, mockWindowFunction);
 
 		testHarness.open();
 
@@ -1879,7 +1886,17 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
 
-		timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window
+		// verify that we can still fire on the GC timer
+		timeAdaptor.shouldFireOnTime(mockTrigger);
+
+		timeAdaptor.advanceTime(testHarness, 19 + allowedLateness); // 19 is maxTime of the window
+
+		// ensure that our trigger is still called
+		timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), 19L + allowedLateness, null);
+
+		// ensure that our window function is called a last timer if the trigger
+		// fires on the GC timer
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 20)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext());