You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:38 UTC

[27/50] [abbrv] flink git commit: [FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index d4fefa2..6f34607 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -21,9 +21,11 @@ package org.apache.flink.streaming.api.operators.windowing.functions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
@@ -31,6 +33,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunct
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
@@ -41,6 +44,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.util.Collector;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Test;
@@ -56,7 +60,11 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.collection.IsMapContaining.hasEntry;
 import static org.hamcrest.core.AllOf.allOf;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyObject;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 public class InternalWindowFunctionTest {
 
@@ -93,7 +101,9 @@ public class InternalWindowFunctionTest {
 		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 
-		windowFunction.apply(((byte)0), w, i, c);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+
+		windowFunction.process(((byte)0), w, ctx, i, c);
 		verify(mock).apply(w, i, c);
 
 		// check close
@@ -134,7 +144,8 @@ public class InternalWindowFunctionTest {
 		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 
-		windowFunction.apply(((byte)0), w, i, c);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+		windowFunction.process(((byte)0), w, ctx, i, c);
 		verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
 
 		// check close
@@ -175,7 +186,8 @@ public class InternalWindowFunctionTest {
 		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
 
-		windowFunction.apply(42L, w, i, c);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+		windowFunction.process(42L, w, ctx, i, c);
 		verify(mock).apply(eq(42L), eq(w), eq(i), eq(c));
 
 		// check close
@@ -215,8 +227,9 @@ public class InternalWindowFunctionTest {
 		TimeWindow w = mock(TimeWindow.class);
 		Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(42L, w, i, c);
+		windowFunction.process(42L, w, ctx, i, c);
 		verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
 
 		// check close
@@ -256,8 +269,9 @@ public class InternalWindowFunctionTest {
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(42L, w, 23L, c);
+		windowFunction.process(42L, w, ctx, 23L, c);
 		verify(mock).apply(eq(42L), eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
 		// check close
@@ -297,8 +311,9 @@ public class InternalWindowFunctionTest {
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(((byte)0), w, 23L, c);
+		windowFunction.process(((byte)0), w, ctx, 23L, c);
 		verify(mock).apply(eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
 		// check close
@@ -338,8 +353,9 @@ public class InternalWindowFunctionTest {
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(((byte)0), w, 23L, c);
+		windowFunction.process(((byte)0), w, ctx, 23L, c);
 		verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
 		// check close
@@ -378,8 +394,9 @@ public class InternalWindowFunctionTest {
 		// check apply
 		TimeWindow w = mock(TimeWindow.class);
 		Collector<String> c = (Collector<String>) mock(Collector.class);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(42L, w, 23L, c);
+		windowFunction.process(42L, w, ctx,23L, c);
 		verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
 
 		// check close
@@ -450,8 +467,9 @@ public class InternalWindowFunctionTest {
 		List<Long> args = new LinkedList<>();
 		args.add(23L);
 		args.add(24L);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(42L, w, args, c);
+		windowFunction.process(42L, w, ctx, args, c);
 		verify(mock).process(
 				eq(42L),
 				(AggregateProcessWindowFunctionMock.Context) anyObject(),
@@ -528,8 +546,9 @@ public class InternalWindowFunctionTest {
 		List<Long> args = new LinkedList<>();
 		args.add(23L);
 		args.add(24L);
+		InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
 
-		windowFunction.apply(((byte)0), w, args, c);
+		windowFunction.process(((byte)0), w, ctx, args, c);
 		verify(mock).process(
 				(AggregateProcessAllWindowFunctionMock.Context) anyObject(),
 				(Iterable) argThat(containsInAnyOrder(allOf(
@@ -552,7 +571,9 @@ public class InternalWindowFunctionTest {
 		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
 
 		@Override
-		public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
+		public void process(Long aLong, ProcessWindowFunction<Long, String, Long, TimeWindow>.Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
+
+		}
 	}
 
 	public static class AggregateProcessWindowFunctionMock
@@ -565,7 +586,9 @@ public class InternalWindowFunctionTest {
 		public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
 
 		@Override
-		public void process(Long aLong, Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
+		public void process(Long aLong, ProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>.Context context, Iterable<Map<Long, Long>> elements, Collector<String> out) throws Exception {
+
+		}
 	}
 
 	public static class AggregateProcessAllWindowFunctionMock

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
index 11508c5..ff1cbdf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
@@ -118,9 +118,9 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
 
 		testHarness.processElement(new StreamRecord<>(1, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -182,9 +182,9 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
 
 		testHarness.processElement(new StreamRecord<>(1, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 8aae46a..faab505 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -163,6 +163,10 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		return Mockito.any();
 	}
 
+	static InternalWindowFunction.InternalWindowContext anyInternalWindowContext() {
+		return Mockito.any();
+	}
+
 	static Trigger.OnMergeContext anyOnMergeContext() {
 		return Mockito.any();
 	}
@@ -408,9 +412,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq((new TimeWindow(0, 2))), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 	}
 
 	@Test
@@ -455,9 +459,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 1)), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 1)), anyInternalWindowContext(), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector());
 	}
 
 	@Test
@@ -509,16 +513,16 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Exception {
 				@SuppressWarnings("unchecked")
-				Collector<String> out = invocation.getArgumentAt(3, Collector.class);
+				Collector<String> out = invocation.getArgumentAt(4, Collector.class);
 				out.collect("Hallo");
 				out.collect("Ciao");
 				return null;
 			}
-		}).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+		}).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		assertThat(testHarness.extractOutputStreamRecords(),
 				contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
@@ -553,25 +557,25 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Exception {
 				@SuppressWarnings("unchecked")
-				Collector<String> out = invocation.getArgumentAt(3, Collector.class);
+				Collector<String> out = invocation.getArgumentAt(4, Collector.class);
 				out.collect("Hallo");
 				out.collect("Ciao");
 				return null;
 			}
-		}).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+		}).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1);
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, never()).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector());
+		verify(mockWindowFunction, never()).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector());
 		assertTrue(testHarness.extractOutputStreamRecords().isEmpty());
 
 		timeAdaptor.shouldFireOnTime(mockTrigger);
 
 		timeAdaptor.advanceTime(testHarness, 1L);
 
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
 
 		assertThat(testHarness.extractOutputStreamRecords(),
 				contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
@@ -650,9 +654,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -693,9 +697,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -858,9 +862,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		timeAdaptor.advanceTime(testHarness, 0L);
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -919,9 +923,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		timeAdaptor.advanceTime(testHarness, 0L);
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -1050,7 +1054,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null);
 
 		verify(mockWindowFunction, never())
-				.apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+				.process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
 
 		assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left
 	}
@@ -1114,7 +1118,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null);
 
 		verify(mockWindowFunction, never())
-				.apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+				.process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
 
 		assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left
 	}
@@ -1186,7 +1190,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null);
 
 		verify(mockWindowFunction, never())
-				.apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+				.process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
 
 		// now we trigger the dangling timer
 		timeAdaptor.advanceTime(testHarness, 10L);
@@ -2208,7 +2212,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		testHarness.processElement(new StreamRecord<>(0, 0L));
 
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// clear is only called at cleanup time/GC time
 		verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -2326,9 +2330,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 
 		verify(mockTrigger, times(2)).clear(anyTimeWindow(), anyTriggerContext());
 
-		verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
-		verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+		verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
 
 		// it's also called for the cleanup timers
 		verify(mockTrigger, times(4)).onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext());
@@ -2339,6 +2343,96 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		assertEquals(0, testHarness.numEventTimeTimers());
 	}
 
+	@Test
+	public void testPerWindowStateSetAndClearedOnEventTimePurge() throws Exception {
+		testPerWindowStateSetAndClearedOnPurge(new EventTimeAdaptor());
+	}
+
+	@Test
+	public void testPerWindowStateSetAndClearedOnProcessingTimePurge() throws Exception {
+		testPerWindowStateSetAndClearedOnPurge(new ProcessingTimeAdaptor());
+	}
+
+	public void testPerWindowStateSetAndClearedOnPurge(TimeDomainAdaptor timeAdaptor) throws Exception {
+		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+		timeAdaptor.setIsEventTime(mockAssigner);
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+			createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+
+		testHarness.open();
+
+		when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext()))
+			.thenReturn(TriggerResult.FIRE);
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+			.thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+		doAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+				context.windowState().getState(valueStateDescriptor).update("hello");
+				return null;
+			}
+		}).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+
+		doAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
+				context.windowState().getState(valueStateDescriptor).clear();
+				return null;
+			}
+		}).when(mockWindowFunction).clear(anyTimeWindow(), anyInternalWindowContext());
+
+		assertEquals(0, testHarness.getOutput().size());
+		assertEquals(0, testHarness.numKeyedStateEntries());
+
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+
+		assertEquals(2, testHarness.numKeyedStateEntries()); // window contents plus value state
+		assertEquals(1, timeAdaptor.numTimers(testHarness)); // gc timers
+
+		timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window
+
+		assertEquals(0, testHarness.numKeyedStateEntries());
+		assertEquals(0, timeAdaptor.numTimers(testHarness));
+	}
+
+	@Test
+	public void testWindowStateNotAvailableToMergingWindows() throws Exception {
+		WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+			createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+
+		testHarness.open();
+
+		when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext()))
+			.thenReturn(TriggerResult.FIRE);
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+			.thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+		doAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+				context.windowState().getState(valueStateDescriptor).update("hello");
+				return null;
+			}
+		}).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+
+		expectedException.expect(UnsupportedOperationException.class);
+		expectedException.expectMessage("Per-window state is not allowed when using merging windows.");
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+	}
+
 	protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
 			WindowAssigner<Integer, W> assigner,
 			Trigger<Integer, W> trigger,

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
index 163117b..2f0e48e 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -22,6 +22,7 @@ import java.io.Serializable
 
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.state.KeyedStateStore
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -47,6 +48,15 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function w
   def process(context: Context, elements: Iterable[IN], out: Collector[OUT])
 
   /**
+    * Deletes any state in the [[Context]] when the Window is purged.
+    *
+    * @param context The context to which the window is being evaluated
+    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+    */
+  @throws[Exception]
+  def clear(context: Context) {}
+
+  /**
     * The context holding window metadata
     */
   abstract class Context {
@@ -54,6 +64,16 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function w
       * @return The window that is being evaluated.
       */
     def window: W
+
+    /**
+      * State accessor for per-key and per-window state.
+      */
+    def windowState: KeyedStateStore
+
+    /**
+      * State accessor for per-key global state.
+      */
+    def globalState: KeyedStateStore
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index 79f3918..bdf6ae6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -22,6 +22,7 @@ import java.io.Serializable
 
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.state.KeyedStateStore
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -49,6 +50,15 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
   def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT])
 
   /**
+    * Deletes any state in the [[Context]] when the Window is purged.
+    *
+    * @param context The context to which the window is being evaluated
+    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+    */
+  @throws[Exception]
+  def clear(context: Context) {}
+
+  /**
     * The context holding window metadata
     */
   abstract class Context {
@@ -56,6 +66,16 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
       * @return The window that is being evaluated.
       */
     def window: W
+
+    /**
+      * State accessor for per-key and per-window state.
+      */
+    def windowState: KeyedStateStore
+
+    /**
+      * State accessor for per-key global state.
+      */
+    def globalState: KeyedStateStore
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index a4fec64..fac5958 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -52,10 +52,25 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
       out: Collector[OUT]): Unit = {
     val ctx = new func.Context {
       override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
     }
     func.process(key, ctx, elements.asScala, out)
   }
 
+  override def clear(context: JProcessWindowFunction[IN, OUT, KEY, W]#Context): Unit = {
+    val ctx = new func.Context {
+      override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
+    }
+    func.clear(ctx)
+  }
+
   override def setRuntimeContext(t: RuntimeContext): Unit = {
     super.setRuntimeContext(t)
     func match {
@@ -99,10 +114,26 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
       out: Collector[OUT]): Unit = {
     val ctx = new func.Context {
       override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
     }
     func.process(ctx, elements.asScala, out)
   }
 
+  override def clear(context: JProcessAllWindowFunction[IN, OUT, W]#Context): Unit = {
+    val ctx = new func.Context {
+      override def window = context.window
+
+      override def windowState = context.windowState()
+
+      override def globalState = context.globalState()
+    }
+    func.clear(ctx)
+  }
+
+
   override def setRuntimeContext(t: RuntimeContext): Unit = {
     super.setRuntimeContext(t)
     func match {