You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:31 UTC

[04/24] flink git commit: [FLINK-2808] [streaming] Refactor and extend state backend abstraction

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 89672df..671544e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -18,15 +18,18 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.apache.flink.util.Collector;
 
 import org.junit.After;
@@ -36,6 +39,7 @@ import org.mockito.stubbing.Answer;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -152,36 +156,34 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		try {
 			@SuppressWarnings("unchecked")
 			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
+			final StreamTask<?, ?> mockTask = createMockTask();
 			
 			AbstractAlignedProcessingTimeWindowOperator<String, String, String, ?> op;
 
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+			op.open();
 			assertTrue(op.getNextSlideTime() % 1000 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+			op.open();
 			assertTrue(op.getNextSlideTime() % 1000 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+			op.open();
 			assertTrue(op.getNextSlideTime() % 500 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+			op.open();
 			assertTrue(op.getNextSlideTime() % 100 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1100 == 0);
 			op.dispose();
@@ -194,25 +196,27 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@Test
 	public void testTumblingWindow() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final int windowSize = 50;
 			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
+			final Object lock = new Object();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
 			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(
 							validatingIdentityFunction, identitySelector, windowSize, windowSize);
 
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 
 			final int numElements = 1000;
 
 			for (int i = 0; i < numElements; i++) {
-				op.processElement(new StreamRecord<Integer>(i));
+				synchronized (lock) {
+					op.processElement(new StreamRecord<Integer>(i));
+				}
 				Thread.sleep(1);
 			}
 
@@ -232,27 +236,32 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
+		finally {
+			timerService.shutdown();
+		}
 	}
 
 	@Test
 	public void testSlidingWindow() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
+			final Object lock = new Object();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			
 			// tumbling window that triggers every 20 milliseconds
 			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
 
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 
 			final int numElements = 1000;
 
 			for (int i = 0; i < numElements; i++) {
-				op.processElement(new StreamRecord<Integer>(i));
+				synchronized (lock) {
+					op.processElement(new StreamRecord<Integer>(i));
+				}
 				Thread.sleep(1);
 			}
 
@@ -288,6 +297,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
+		finally {
+			timerService.shutdown();
+		}
 	}
 
 	@Test
@@ -296,39 +308,15 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
 			final Object lock = new Object();
-
-			doAnswer(new Answer<Void>() {
-				@Override
-				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-					timerService.schedule(
-							new Callable<Object>() {
-								@Override
-								public Object call() throws Exception {
-									synchronized (lock) {
-										target.trigger(timestamp);
-									}
-									return null;
-								}
-							},
-							timestamp - System.currentTimeMillis(),
-							TimeUnit.MILLISECONDS);
-					return null;
-				}
-			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
 			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 50, 50);
 
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 
 			synchronized (lock) {
 				op.processElement(new StreamRecord<Integer>(1));
@@ -360,7 +348,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		} finally {
+		}
+		finally {
 			timerService.shutdown();
 		}
 	}
@@ -371,39 +360,15 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
 			final Object lock = new Object();
-
-			doAnswer(new Answer<Void>() {
-				@Override
-				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-					timerService.schedule(
-							new Callable<Object>() {
-								@Override
-								public Object call() throws Exception {
-									synchronized (lock) {
-										target.trigger(timestamp);
-									}
-									return null;
-								}
-							},
-							timestamp - System.currentTimeMillis(),
-							TimeUnit.MILLISECONDS);
-					return null;
-				}
-			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
 			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
 
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 
 			synchronized (lock) {
 				op.processElement(new StreamRecord<Integer>(1));
@@ -426,31 +391,34 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		} finally {
+		}
+		finally {
 			timerService.shutdown();
 		}
 	}
 	
 	@Test
 	public void testEmitTrailingDataOnClose() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>();
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
+			final Object lock = new Object();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 			
 			// the operator has a window time that is so long that it will not fire in this test
 			final long oneYear = 365L * 24 * 60 * 60 * 1000;
 			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op = 
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector,
 							oneYear, oneYear);
-			
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 			
 			List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 			for (Integer i : data) {
-				op.processElement(new StreamRecord<Integer>(i));
+				synchronized (lock) {
+					op.processElement(new StreamRecord<Integer>(i));
+				}
 			}
 			
 			op.close();
@@ -465,15 +433,18 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
+		finally {
+			timerService.shutdown();
+		}
 	}
 
 	@Test
 	public void testPropagateExceptionsFromClose() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>();
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
+			final Object lock = new Object();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			WindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
 
@@ -483,11 +454,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 					new AccumulatingProcessingTimeWindowOperator<>(
 							failingFunction, identitySelector, hundredYears, hundredYears);
 
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 
 			for (int i = 0; i < 150; i++) {
-				op.processElement(new StreamRecord<Integer>(i));
+				synchronized (lock) {
+					op.processElement(new StreamRecord<Integer>(i));
+				}
 			}
 			
 			try {
@@ -506,6 +479,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
+		finally {
+			timerService.shutdown();
+		}
 	}
 	
 	// ------------------------------------------------------------------------
@@ -551,4 +527,49 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			}
 		}
 	}
+
+	private static StreamTask<?, ?> createMockTask() {
+		StreamTask<?, ?> task = mock(StreamTask.class);
+		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
+		when(task.getName()).thenReturn("Test task name");
+		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
+		Environment env = mock(Environment.class);
+		when(env.getIndexInSubtaskGroup()).thenReturn(0);
+		when(env.getNumberOfSubtasks()).thenReturn(1);
+		when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
+
+		when(task.getEnvironment()).thenReturn(env);
+
+		return task;
+	}
+
+	private static StreamTask<?, ?> createMockTaskWithTimer(
+			final ScheduledExecutorService timerService, final Object lock)
+	{
+		StreamTask<?, ?> mockTask = createMockTask();
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+				final Long timestamp = (Long) invocationOnMock.getArguments()[0];
+				final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
+				timerService.schedule(
+						new Callable<Object>() {
+							@Override
+							public Object call() throws Exception {
+								synchronized (lock) {
+									target.trigger(timestamp);
+								}
+								return null;
+							}
+						},
+						timestamp - System.currentTimeMillis(),
+						TimeUnit.MILLISECONDS);
+				return null;
+			}
+		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
+
+		return mockTask;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index fa90e4a..106e833 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -18,14 +18,17 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 import org.junit.After;
 import org.junit.Test;
@@ -34,6 +37,7 @@ import org.mockito.stubbing.Answer;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -149,36 +153,34 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		try {
 			@SuppressWarnings("unchecked")
 			final Output<StreamRecord<String>> mockOut = mock(Output.class);
+			final StreamTask<?, ?> mockTask = createMockTask();
 			
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
 			AggregatingProcessingTimeWindowOperator<String, String> op;
 
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+			op.open();
 			assertTrue(op.getNextSlideTime() % 1000 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+			op.open();
 			assertTrue(op.getNextSlideTime() % 1000 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+			op.open();
 			assertTrue(op.getNextSlideTime() % 500 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+			op.open();
 			assertTrue(op.getNextSlideTime() % 100 == 0);
 			assertTrue(op.getNextEvaluationTime() % 1100 == 0);
 			op.dispose();
@@ -191,19 +193,20 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	@Test
 	public void testTumblingWindowUniqueElements() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final int windowSize = 50;
 			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
 			
 			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
 					new AggregatingProcessingTimeWindowOperator<>(
 							sumFunction, identitySelector, windowSize, windowSize);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+			
+			final Object lock = new Object();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 
 			final int numElements = 1000;
 
@@ -228,6 +231,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
+		finally {
+			timerService.shutdownNow();
+		}
 	}
 
 	@Test
@@ -239,37 +245,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			final int windowSize = 50;
 			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
 
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
 			final Object lock = new Object();
-			doAnswer(new Answer<Void>() {
-				@Override
-				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-					timerService.schedule(
-							new Callable<Object>() {
-								@Override
-								public Object call() throws Exception {
-									synchronized (lock) {
-										target.trigger(timestamp);
-									}
-									return null;
-								}
-							},
-							timestamp - System.currentTimeMillis(),
-							TimeUnit.MILLISECONDS);
-					return null;
-				}
-			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 			
 			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
 					new AggregatingProcessingTimeWindowOperator<>(
 							sumFunction, identitySelector, windowSize, windowSize);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+			
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 
 			final int numWindows = 10;
 
@@ -315,23 +299,26 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	@Test
 	public void testSlidingWindow() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
 
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
+			final Object lock = new Object();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
 					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50);
 
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 
 			final int numElements = 1000;
 
 			for (int i = 0; i < numElements; i++) {
-				op.processElement(new StreamRecord<Integer>(i));
+				synchronized (lock) {
+					op.processElement(new StreamRecord<Integer>(i));
+				}
 				Thread.sleep(1);
 			}
 
@@ -366,6 +353,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
+		finally {
+			timerService.shutdownNow();
+		}
 	}
 
 	@Test
@@ -374,38 +364,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
 			final Object lock = new Object();
-			doAnswer(new Answer<Void>() {
-				@Override
-				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-					timerService.schedule(
-							new Callable<Object>() {
-								@Override
-								public Object call() throws Exception {
-									synchronized (lock) {
-										target.trigger(timestamp);
-									}
-									return null;
-								}
-							},
-							timestamp - System.currentTimeMillis(),
-							TimeUnit.MILLISECONDS);
-					return null;
-				}
-			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
 					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50);
 
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 
 			synchronized (lock) {
 				op.processElement(new StreamRecord<Integer>(1));
@@ -428,30 +395,33 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		} finally {
+		}
+		finally {
 			timerService.shutdown();
 		}
 	}
 	
 	@Test
 	public void testEmitTrailingDataOnClose() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>();
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
+			final Object lock = new Object();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 			
 			// the operator has a window time that is so long that it will not fire in this test
 			final long oneYear = 365L * 24 * 60 * 60 * 1000;
 			AggregatingProcessingTimeWindowOperator<Integer, Integer> op = 
 					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, oneYear, oneYear);
-			
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 			
 			List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 			for (Integer i : data) {
-				op.processElement(new StreamRecord<Integer>(i));
+				synchronized (lock) {
+					op.processElement(new StreamRecord<Integer>(i));
+				}
 			}
 			
 			op.close();
@@ -466,15 +436,18 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
+		finally {
+			timerService.shutdown();
+		}
 	}
 
 	@Test
 	public void testPropagateExceptionsFromProcessElement() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>();
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
+			final Object lock = new Object();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			ReduceFunction<Integer> failingFunction = new FailingFunction(100);
 
@@ -484,11 +457,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					new AggregatingProcessingTimeWindowOperator<>(
 							failingFunction, identitySelector, hundredYears, hundredYears);
 
-			op.setup(out, mockContext);
-			op.open(new Configuration());
+			op.setup(mockTask, new StreamConfig(new Configuration()), out);
+			op.open();
 
 			for (int i = 0; i < 100; i++) {
-				op.processElement(new StreamRecord<Integer>(1));
+				synchronized (lock) {
+					op.processElement(new StreamRecord<Integer>(1));
+				}
 			}
 			
 			try {
@@ -505,6 +480,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
+		finally {
+			timerService.shutdown();
+		}
 	}
 	
 	// ------------------------------------------------------------------------
@@ -546,4 +524,49 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			return value1 + value2;
 		}
 	}
+	
+	private static StreamTask<?, ?> createMockTask() {
+		StreamTask<?, ?> task = mock(StreamTask.class);
+		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
+		when(task.getName()).thenReturn("Test task name");
+		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
+		Environment env = mock(Environment.class);
+		when(env.getIndexInSubtaskGroup()).thenReturn(0);
+		when(env.getNumberOfSubtasks()).thenReturn(1);
+		when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
+		
+		when(task.getEnvironment()).thenReturn(env);
+		
+		return task;
+	}
+
+	private static StreamTask<?, ?> createMockTaskWithTimer(
+			final ScheduledExecutorService timerService, final Object lock)
+	{
+		StreamTask<?, ?> mockTask = createMockTask();
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+				final Long timestamp = (Long) invocationOnMock.getArguments()[0];
+				final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
+				timerService.schedule(
+						new Callable<Object>() {
+							@Override
+							public Object call() throws Exception {
+								synchronized (lock) {
+									target.trigger(timestamp);
+								}
+								return null;
+							}
+						},
+						timestamp - System.currentTimeMillis(),
+						TimeUnit.MILLISECONDS);
+				return null;
+			}
+		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
+		
+		return mockTask;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 06fca6b..6c48668 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -142,7 +142,7 @@ public class StreamTaskTestHarness<OUT> {
 		outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
 		streamConfig.setOutEdgesInOrder(outEdgesInOrder);
 		streamConfig.setNonChainedOutputs(outEdgesInOrder);
-		streamConfig.setTypeSerializerOut1(outputSerializer);
+		streamConfig.setTypeSerializerOut(outputSerializer);
 		streamConfig.setVertexID(0);
 
 	}
@@ -243,8 +243,8 @@ public class StreamTaskTestHarness<OUT> {
 		// first wait for all input queues to be empty
 		try {
 			Thread.sleep(1);
-		} catch (InterruptedException e) {
-		}
+		} catch (InterruptedException ignored) {}
+		
 		while (true) {
 			boolean allEmpty = true;
 			for (int i = 0; i < numInputGates; i++) {
@@ -254,8 +254,8 @@ public class StreamTaskTestHarness<OUT> {
 			}
 			try {
 				Thread.sleep(10);
-			} catch (InterruptedException e) {
-			}
+			} catch (InterruptedException ignored) {}
+			
 			if (allEmpty) {
 				break;
 			}
@@ -273,8 +273,7 @@ public class StreamTaskTestHarness<OUT> {
 
 			try {
 				Thread.sleep(1);
-			} catch (InterruptedException e) {
-			}
+			} catch (InterruptedException ignored) {}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
index 7a53ceb..cdc2c53 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -59,7 +60,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 
 		DataStream<String> source = env.addSource(new InfiniteTestSource());
 
-		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(StreamOperator.ChainingStrategy.ALWAYS));
+		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.ALWAYS));
 
 		boolean testSuccess = false;
 		try {
@@ -95,7 +96,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 
 		DataStream<String> source = env.addSource(new InfiniteTestSource());
 
-		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(StreamOperator.ChainingStrategy.NEVER));
+		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.NEVER));
 
 		boolean testSuccess = false;
 		try {
@@ -134,7 +135,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 		source.connect(source).transform(
 				"Custom Operator",
 				BasicTypeInfo.STRING_TYPE_INFO,
-				new TwoInputTimerOperator(StreamOperator.ChainingStrategy.NEVER));
+				new TwoInputTimerOperator(ChainingStrategy.NEVER));
 
 		boolean testSuccess = false;
 		try {
@@ -180,7 +181,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			}
 
 			if (first) {
-				getRuntimeContext().registerTimer(System.currentTimeMillis() + 100, this);
+				registerTimer(System.currentTimeMillis() + 100, this);
 				first = false;
 			}
 			numElements++;
@@ -197,7 +198,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			try {
 				numTimers++;
 				throwIfDone();
-				getRuntimeContext().registerTimer(System.currentTimeMillis() + 1, this);
+				registerTimer(System.currentTimeMillis() + 1, this);
 			} finally {
 				semaphore.release();
 			}
@@ -236,7 +237,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			}
 
 			if (first) {
-				getRuntimeContext().registerTimer(System.currentTimeMillis() + 100, this);
+				registerTimer(System.currentTimeMillis() + 100, this);
 				first = false;
 			}
 			numElements++;
@@ -251,7 +252,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			}
 
 			if (first) {
-				getRuntimeContext().registerTimer(System.currentTimeMillis() + 100, this);
+				registerTimer(System.currentTimeMillis() + 100, this);
 				first = false;
 			}
 			numElements++;
@@ -269,7 +270,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			try {
 				numTimers++;
 				throwIfDone();
-				getRuntimeContext().registerTimer(System.currentTimeMillis() + 1, this);
+				registerTimer(System.currentTimeMillis() + 1, this);
 			} finally {
 				semaphore.release();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index a88aa1a..dafba9c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -439,8 +439,8 @@ public class TimestampITCase {
 		}
 
 		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
+		public void open() throws Exception {
+			super.open();
 			watermarks = new ArrayList<Watermark>();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 3651230..000a1a2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -21,17 +21,33 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class MockContext<IN, OUT> {
+	
 	private Collection<IN> inputs;
 	private List<OUT> outputs;
 
@@ -57,27 +73,63 @@ public class MockContext<IN, OUT> {
 
 	public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) {
 		MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
-		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(
-				new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
-				new ExecutionConfig(), 
-				null, null, 
-				new HashMap<String, Accumulator<?, ?>>(),
-				null);
-
-		operator.setup(mockContext.output, runtimeContext);
+
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+		final Object lock = new Object();
+		final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+				
+		operator.setup(mockTask, new StreamConfig(new Configuration()), mockContext.output);
 		try {
-			operator.open(null);
+			operator.open();
 
 			StreamRecord<IN> nextRecord;
 			for (IN in: inputs) {
-				operator.processElement(new StreamRecord<IN>(in));
+				synchronized (lock) {
+					operator.processElement(new StreamRecord<IN>(in));
+				}
 			}
 
 			operator.close();
 		} catch (Exception e) {
 			throw new RuntimeException("Cannot invoke operator.", e);
+		} finally {
+			timerService.shutdownNow();
 		}
 
 		return mockContext.getOutputs();
 	}
+
+	private static StreamTask<?, ?> createMockTaskWithTimer(
+			final ScheduledExecutorService timerService, final Object lock)
+	{
+		StreamTask<?, ?> task = mock(StreamTask.class);
+		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
+		when(task.getName()).thenReturn("Test task name");
+		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(task.getEnvironment()).thenReturn(new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024));
+		when(task.getCheckpointLock()).thenReturn(lock);
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+				final Long timestamp = (Long) invocationOnMock.getArguments()[0];
+				final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
+				timerService.schedule(
+						new Callable<Object>() {
+							@Override
+							public Object call() throws Exception {
+								synchronized (lock) {
+									target.trigger(timestamp);
+								}
+								return null;
+							}
+						},
+						timestamp - System.currentTimeMillis(),
+						TimeUnit.MILLISECONDS);
+				return null;
+			}
+		}).when(task).registerTimer(anyLong(), any(Triggerable.class));
+
+		return task;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index f5ce3fc..edf3a09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -18,25 +18,28 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.mockito.stubbing.OngoingStubbing;
 
-import java.io.Serializable;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * A test harness for testing a {@link OneInputStreamOperator}.
  *
@@ -47,28 +50,39 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  */
 public class OneInputStreamOperatorTestHarness<IN, OUT> {
 
-	OneInputStreamOperator<IN, OUT> operator;
+	final OneInputStreamOperator<IN, OUT> operator;
 
-	ConcurrentLinkedQueue<Object> outputList;
+	final ConcurrentLinkedQueue<Object> outputList;
 
-	ExecutionConfig executionConfig;
+	final ExecutionConfig executionConfig;
+	
+	final Object checkpointLock;
 
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
+		this(operator, new StreamConfig(new Configuration()));
+	}
+	
+	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, StreamConfig config) {
 		this.operator = operator;
+		this.outputList = new ConcurrentLinkedQueue<Object>();
+		this.executionConfig = new ExecutionConfig();
+		this.checkpointLock = new Object();
+
+		Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
+		StreamTask<?, ?> mockTask = mock(StreamTask.class);
+		when(mockTask.getName()).thenReturn("Mock Task");
+		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+		when(mockTask.getConfiguration()).thenReturn(config);
+		when(mockTask.getEnvironment()).thenReturn(env);
+		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+		
+		// ugly Java generic hacks
+		@SuppressWarnings("unchecked")
+		OngoingStubbing<StateBackend<?>> stubbing = 
+				(OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(mockTask.getStateBackend());
+		stubbing.thenReturn(MemoryStateBackend.defaultInstance());
 
-		outputList = new ConcurrentLinkedQueue<Object>();
-
-		executionConfig = new ExecutionConfig();
-
-		StreamingRuntimeContext runtimeContext =  new StreamingRuntimeContext(
-				new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
-				executionConfig,
-				null,
-				new LocalStateHandle.LocalStateHandleProvider<Serializable>(),
-				new HashMap<String, Accumulator<?, ?>>(),
-				new OneInputStreamTask());
-
-		operator.setup(new MockOutput(), runtimeContext);
+		operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput());
 	}
 
 	/**
@@ -81,19 +95,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(org.apache.flink.configuration.Configuration)}
-	 * with an empty {@link org.apache.flink.configuration.Configuration}.
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}
 	 */
 	public void open() throws Exception {
-		operator.open(new Configuration());
-	}
-
-	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(org.apache.flink.configuration.Configuration)}
-	 * with the given {@link org.apache.flink.configuration.Configuration}.
-	 */
-	public void open(Configuration config) throws Exception {
-		operator.open(config);
+		operator.open();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 428131a..2afdc40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.util;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -29,26 +28,30 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import static org.mockito.Mockito.*;
 
 public class SourceFunctionUtil<T> {
 
 	public static <T> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
 		List<T> outputs = new ArrayList<T>();
+		
 		if (sourceFunction instanceof RichFunction) {
+
+			AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
+			when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());
+			
 			RuntimeContext runtimeContext =  new StreamingRuntimeContext(
+					operator,
 					new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
-					new ExecutionConfig(), 
-					null, 
-					new LocalStateHandle.LocalStateHandleProvider<Serializable>(),
-					new HashMap<String, Accumulator<?, ?>>(),
-					null);
+					new HashMap<String, Accumulator<?, ?>>());
 			
 			((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 2418f19..9b33c6a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -19,24 +19,27 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.mockito.stubbing.OngoingStubbing;
 
-import java.io.Serializable;
-import java.util.HashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * A test harness for testing a {@link TwoInputStreamOperator}.
  *
@@ -49,26 +52,37 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
 
 	TwoInputStreamOperator<IN1, IN2, OUT> operator;
 
-	ConcurrentLinkedQueue<Object> outputList;
+	final ConcurrentLinkedQueue<Object> outputList;
+
+	final ExecutionConfig executionConfig;
 
-	ExecutionConfig executionConfig;
+	final Object checkpointLock;
 
 	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) {
+		this(operator, new StreamConfig(new Configuration()));
+	}
+		
+	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, StreamConfig config) {
 		this.operator = operator;
+		this.outputList = new ConcurrentLinkedQueue<Object>();
+		this.executionConfig = new ExecutionConfig();
+		this.checkpointLock = new Object();
+
+		Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
+		StreamTask<?, ?> mockTask = mock(StreamTask.class);
+		when(mockTask.getName()).thenReturn("Mock Task");
+		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+		when(mockTask.getConfiguration()).thenReturn(config);
+		when(mockTask.getEnvironment()).thenReturn(env);
+		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+
+		// ugly Java generic hacks
+		@SuppressWarnings("unchecked")
+		OngoingStubbing<StateBackend<?>> stubbing =
+				(OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(mockTask.getStateBackend());
+		stubbing.thenReturn(MemoryStateBackend.defaultInstance());
 
-		outputList = new ConcurrentLinkedQueue<Object>();
-
-		executionConfig = new ExecutionConfig();
-
-		StreamingRuntimeContext runtimeContext =  new StreamingRuntimeContext(
-				new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
-				new ExecutionConfig(),
-				null,
-				new LocalStateHandle.LocalStateHandleProvider<>(),
-				new HashMap<String, Accumulator<?, ?>>(),
-				new TwoInputStreamTask());
-
-		operator.setup(new MockOutput(), runtimeContext);
+		operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput());
 	}
 
 	/**
@@ -82,19 +96,10 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
 
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(Configuration)}
-	 * with an empty {@link Configuration}.
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}.
 	 */
 	public void open() throws Exception {
-		operator.open(new Configuration());
-	}
-
-	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(Configuration)}
-	 * with the given {@link Configuration}.
-	 */
-	public void open(Configuration config) throws Exception {
-		operator.open(config);
+		operator.open();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 0565f52..6855e00 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -228,11 +228,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
 
     val cleanFun = clean(fun)
+    val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]
+    
     val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
       def getKey(in: T) = cleanFun(in)
-      override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
+      override def getProducedType: TypeInformation[K] = keyType
     }
-    javaStream.keyBy(keyExtractor)
+    new JavaKeyedStream(javaStream, keyExtractor, keyType)
   }
 
   /**
@@ -431,32 +433,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
     javaStream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]
   }
-
-  /**
-   * Creates a new DataStream by applying the given stateful function to every element of this 
-   * DataStream. To use state partitioning, a key must be defined using .keyBy(..), in which 
-   * case an independent state will be kept per key.
-   * 
-   * Note that the user state object needs to be serializable.
-   */
-  def mapWithState[R: TypeInformation: ClassTag, S](
-      fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-
-    val cleanFun = clean(fun)
-    val mapper = new RichMapFunction[T, R] with StatefulFunction[T, R, S] {
-      override def map(in: T): R = {
-        applyWithState(in, cleanFun)
-      }
-
-      val partitioned = isStatePartitioned
-    }
-    
-    map(mapper)
-  }
-
+  
   /**
    * Creates a new DataStream by applying the given function to every element and flattening
    * the results.
@@ -501,32 +478,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Creates a new DataStream by applying the given stateful function to every element and 
-   * flattening the results. To use state partitioning, a key must be defined using .keyBy(..), 
-   * in which case an independent state will be kept per key.
-   * 
-   * Note that the user state object needs to be serializable.
-   */
-  def flatMapWithState[R: TypeInformation: ClassTag, S](
-      fun: (T, Option[S]) => (TraversableOnce[R], Option[S])):
-      DataStream[R] = {
-    if (fun == null) {
-      throw new NullPointerException("Flatmap function must not be null.")
-    }
-
-    val cleanFun = clean(fun)
-    val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T,TraversableOnce[R],S]{
-      override def flatMap(in: T, out: Collector[R]): Unit = {
-        applyWithState(in, cleanFun) foreach out.collect
-      }
-
-      val partitioned = isStatePartitioned
-    }
-
-    flatMap(flatMapper)
-  }
-
-  /**
    * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
    */
   def filter(filter: FilterFunction[T]): DataStream[T] = {
@@ -549,35 +500,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
     }
     this.filter(filter)
   }
-  
-  /**
-   * Creates a new DataStream that contains only the elements satisfying the given stateful filter 
-   * predicate. To use state partitioning, a key must be defined using .keyBy(..), in which case
-   * an independent state will be kept per key.
-   * 
-   * Note that the user state object needs to be serializable.
-   */
-  def filterWithState[S](
-      fun: (T, Option[S]) => (Boolean, Option[S])): DataStream[T] = {
-    if (fun == null) {
-      throw new NullPointerException("Filter function must not be null.")
-    }
-
-    val cleanFun = clean(fun)
-    val filterFun = new RichFilterFunction[T] with StatefulFunction[T, Boolean, S] {
-      override def filter(in: T): Boolean = {
-        applyWithState(in, cleanFun)
-      }
-
-      val partitioned = isStatePartitioned
-    }
-    
-    filter(filterFun)
-  }
-
-  private[flink] def isStatePartitioned: Boolean = {
-    javaStream.isInstanceOf[JavaKeyedStream[_, _]]
-  }
 
   /**
    * Windows this DataStream into tumbling time windows.

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index a588931..84354a3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -18,18 +18,19 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.streaming.api.datastream.{KeyedStream => KeyedJavaStream, DataStream => JavaStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
+import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce
+import org.apache.flink.streaming.api.scala.function.StatefulFunction
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.AbstractTime
-import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window, TimeWindow}
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
+import org.apache.flink.util.Collector
+
 import scala.reflect.ClassTag
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.functions.FoldFunction
-import org.apache.flink.api.common.functions.ReduceFunction
 
 
 class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) {
@@ -262,10 +263,99 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
           javaStream.getExecutionConfig)
     }
 
-    val invokable =  new StreamGroupedReduce[T](reducer,javaStream.getKeySelector(),getType())
+    val invokable =  new StreamGroupedReduce[T](reducer,
+      getType().createSerializer(getExecutionConfig))
      
     new DataStream[T](javaStream.transform("aggregation", javaStream.getType(),invokable))
       .asInstanceOf[DataStream[T]]
   }
+
+  // ------------------------------------------------------------------------
+  //  functions with state
+  // ------------------------------------------------------------------------
+  
+  /**
+   * Creates a new DataStream that contains only the elements satisfying the given stateful filter 
+   * predicate. To use state partitioning, a key must be defined using .keyBy(..), in which case
+   * an independent state will be kept per key.
+   *
+   * Note that the user state object needs to be serializable.
+   */
+  def filterWithState[S : TypeInformation](
+        fun: (T, Option[S]) => (Boolean, Option[S])): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Filter function must not be null.")
+    }
+
+    val cleanFun = clean(fun)
+    val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
+
+    val filterFun = new RichFilterFunction[T] with StatefulFunction[T, Boolean, S] {
+
+      override val stateType: TypeInformation[S] = stateTypeInfo
+
+      override def filter(in: T): Boolean = {
+        applyWithState(in, cleanFun)
+      }
+    }
+
+    filter(filterFun)
+  }
+
+  /**
+   * Creates a new DataStream by applying the given stateful function to every element of this 
+   * DataStream. To use state partitioning, a key must be defined using .keyBy(..), in which 
+   * case an independent state will be kept per key.
+   *
+   * Note that the user state object needs to be serializable.
+   */
+  def mapWithState[R: TypeInformation: ClassTag, S: TypeInformation](
+        fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+
+    val cleanFun = clean(fun)
+    val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
+    
+    val mapper = new RichMapFunction[T, R] with StatefulFunction[T, R, S] {
+
+      override val stateType: TypeInformation[S] = stateTypeInfo
+      
+      override def map(in: T): R = {
+        applyWithState(in, cleanFun)
+      }
+    }
+
+    map(mapper)
+  }
+  
+  /**
+   * Creates a new DataStream by applying the given stateful function to every element and 
+   * flattening the results. To use state partitioning, a key must be defined using .keyBy(..), 
+   * in which case an independent state will be kept per key.
+   *
+   * Note that the user state object needs to be serializable.
+   */
+  def flatMapWithState[R: TypeInformation: ClassTag, S: TypeInformation](
+        fun: (T, Option[S]) => (TraversableOnce[R], Option[S])): DataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("Flatmap function must not be null.")
+    }
+
+    val cleanFun = clean(fun)
+    val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
+    
+    val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T,TraversableOnce[R],S]{
+
+      override val stateType: TypeInformation[S] = stateTypeInfo
+      
+      override def flatMap(in: T, out: Collector[R]): Unit = {
+        applyWithState(in, cleanFun) foreach out.collect
+      }
+    }
+
+    flatMap(flatMapper)
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index f767aba..29bf938 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,13 +19,14 @@
 package org.apache.flink.streaming.api.scala
 
 import java.util.Objects
+import java.util.Objects._
 
 import com.esotericsoftware.kryo.Serializer
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.runtime.state.StateHandleProvider
+import org.apache.flink.streaming.api.state.StateBackend
 import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
@@ -184,17 +185,39 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.enableCheckpointing()
     this
   }
-
-  /**
-   * Sets the given StateHandleProvider to be used for storing operator state
-   * checkpoints when checkpointing is enabled.
-   */
-  def setStateHandleProvider(provider: StateHandleProvider[_]): StreamExecutionEnvironment = {
-    javaEnv.setStateHandleProvider(provider)
+  
+  def getCheckpointingMode = javaEnv.getCheckpointingMode()
+
+  /**
+   * Sets the state backend that describes how to store and checkpoint operator state.
+   * It defines in what form the key/value state, accessible from operations on
+   * [[KeyedStream]] is maintained (heap, managed memory, externally), and where state
+   * snapshots/checkpoints are stored, both for the key/value state, and for checkpointed
+   * functions (implementing the interface 
+   * [[org.apache.flink.streaming.api.checkpoint.Checkpointed]].
+   *
+   * <p>The [[org.apache.flink.streaming.api.state.memory.MemoryStateBackend]] for example
+   * maintains the state in heap memory, as objects. It is lightweight without extra 
+   * dependencies, but can checkpoint only small states (some counters).
+   *
+   * <p>In contrast, the [[org.apache.flink.streaming.api.state.filesystem.FsStateBackend]]
+   * stores checkpoints of the state (also maintained as heap objects) in files. When using
+   * a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
+   * that state is not lost upon failures of individual nodes and that the entire streaming
+   * program can be executed highly available and strongly consistent (assuming that Flink
+   * is run in high-availability mode).
+   */
+  def setStateBackend(backend: StateBackend[_]): StreamExecutionEnvironment = {
+    javaEnv.setStateBackend(backend)
     this
   }
 
   /**
+   * Returns the state backend that defines how to store and checkpoint state.
+   */
+  def getStateBackend: StateBackend[_] = javaEnv.getStateBackend()
+  
+  /**
    * Sets the number of times that failed tasks are re-executed. A value of zero
    * effectively disables fault tolerance. A value of "-1" indicates that the system
    * default value (as defined in the configuration) should be used.

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
index 89c9d00..5a591a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.scala.function
 
 import org.apache.flink.api.common.functions.RichFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.api.common.state.OperatorState
 
@@ -28,17 +29,20 @@ import org.apache.flink.api.common.state.OperatorState
  * call the applyWithState method in his own RichFunction implementation.
  */
 trait StatefulFunction[I, O, S] extends RichFunction {
-
-  var state: OperatorState[Option[S]] = _
-  val partitioned: Boolean
+  
+  var state: OperatorState[S] = _
+  val stateType: TypeInformation[S]
 
   def applyWithState(in: I, fun: (I, Option[S]) => (O, Option[S])): O = {
-    val (o, s) = fun(in, state.value)
-    state.update(s)
+    val (o, s: Option[S]) = fun(in, Option(state.value()))
+    s match {
+      case Some(v) => state.update(v)
+      case None => state.update(null.asInstanceOf[S])
+    }
     o
   }
 
   override def open(c: Configuration) = {
-    state = getRuntimeContext().getOperatorState("state", None, partitioned)
+    state = getRuntimeContext().getKeyValueState[S](stateType, null.asInstanceOf[S])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 91639ed..fe85fd1 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.api.scala
 
 import java.lang
-import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner, FoldFunction, Function}
+import org.apache.flink.api.common.functions._
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
@@ -28,12 +28,13 @@ import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, Stre
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
 import org.apache.flink.streaming.api.windowing.triggers.{PurgingTrigger, CountTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.runtime.partitioner._
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.util.Collector
+
 import org.junit.Assert.fail
 import org.junit.Test
-import org.apache.flink.streaming.api.scala.function.StatefulFunction
 
 class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
@@ -239,7 +240,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
    * Tests whether parallelism gets set.
    */
   @Test
-  def testParallelism {
+  def testParallelism() {
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(10)
 
     val src = env.fromElements(new Tuple2[Long, Long](0L, 0L))
@@ -259,7 +260,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
     try {
       src.setParallelism(3)
-      fail
+      fail()
     }
     catch {
       case success: IllegalArgumentException => {
@@ -290,14 +291,14 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
-  def testTypeInfo {
+  def testTypeInfo() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     val src1: DataStream[Long] = env.generateSequence(0, 0)
     assert(TypeExtractor.getForClass(classOf[Long]) == src1.getType)
 
     val map: DataStream[(Integer, String)] = src1.map(x => null)
-    assert(classOf[scala.Tuple2[Integer, String]] == map.getType.getTypeClass)
+    assert(classOf[scala.Tuple2[Integer, String]] == map.getType().getTypeClass)
 
     val window: DataStream[String] = map
       .windowAll(GlobalWindows.create())
@@ -310,12 +311,12 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
       .windowAll(GlobalWindows.create())
       .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
       .fold(0, (accumulator: Int, value: String) => 0)
-    assert(TypeExtractor.getForClass(classOf[Int]) == flatten.getType)
+    assert(TypeExtractor.getForClass(classOf[Int]) == flatten.getType())
 
     // TODO check for custom case class
   }
 
-  @Test def operatorTest {
+  @Test def operatorTest() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     val src = env.generateSequence(0, 0)
@@ -327,20 +328,14 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     val map = src.map(mapFunction)
     assert(mapFunction == getFunctionForDataStream(map))
     assert(getFunctionForDataStream(map.map(x => 0)).isInstanceOf[MapFunction[_, _]])
-
-    val statefulMap1 = src.mapWithState((in, state: Option[Long]) => (in, None))
-    assert(getFunctionForDataStream(statefulMap1).isInstanceOf[MapFunction[_,_]])
-    assert(!getFunctionForDataStream(statefulMap1).
-        asInstanceOf[StatefulFunction[_,_,_]].partitioned)
     
-    val statefulMap2 = src.keyBy(x=>x).mapWithState(
-        (in, state: Option[Long]) => (in, None))
-    assert(getFunctionForDataStream(statefulMap2).
-        asInstanceOf[StatefulFunction[_,_,_]].partitioned)
+    val statefulMap2 = src.keyBy(x => x).mapWithState(
+        (in, state: Option[Long]) => (in, None.asInstanceOf[Option[Long]]))
     
     val flatMapFunction = new FlatMapFunction[Long, Int] {
       override def flatMap(value: Long, out: Collector[Int]): Unit = {}
     }
+    
     val flatMap = src.flatMap(flatMapFunction)
     assert(flatMapFunction == getFunctionForDataStream(flatMap))
     assert(
@@ -348,15 +343,8 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
         .flatMap((x: Int, out: Collector[Int]) => {}))
         .isInstanceOf[FlatMapFunction[_, _]])
 
-    val statefulfMap1 = src.flatMapWithState((in, state: Option[Long]) => (List(in), None))
-    assert(getFunctionForDataStream(statefulfMap1).isInstanceOf[FlatMapFunction[_, _]])
-    assert(!getFunctionForDataStream(statefulfMap1).
-        asInstanceOf[StatefulFunction[_, _, _]].partitioned)
-
-    val statefulfMap2 = src.keyBy(x=>x).flatMapWithState(
-        (in, state: Option[Long]) => (List(in), None))
-    assert(getFunctionForDataStream(statefulfMap2).
-        asInstanceOf[StatefulFunction[_, _, _]].partitioned)
+    val statefulfMap2 = src.keyBy(x => x).flatMapWithState(
+        (in, state: Option[Long]) => (List(in), None.asInstanceOf[Option[Long]]))
    
     val filterFunction = new FilterFunction[Int] {
       override def filter(value: Int): Boolean = false
@@ -369,15 +357,8 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
         .filter((x: Int) => true))
         .isInstanceOf[FilterFunction[_]])
 
-    val statefulFilter1 = src.filterWithState((in, state: Option[Long]) => (true, None))
-    assert(getFunctionForDataStream(statefulFilter1).isInstanceOf[FilterFunction[_]])
-    assert(!getFunctionForDataStream(statefulFilter1).
-        asInstanceOf[StatefulFunction[_, _, _]].partitioned)
-
-    val statefulFilter2 = src.keyBy(x=>x).filterWithState(
+    val statefulFilter2 = src.keyBy( x => x).filterWithState[Long](
         (in, state: Option[Long]) => (false, None))
-    assert(getFunctionForDataStream(statefulFilter2).
-        asInstanceOf[StatefulFunction[_, _, _]].partitioned)
    
     try {
       env.getStreamGraph.getStreamEdge(map.getId, unionFilter.getId)
@@ -412,7 +393,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     assert(2 == moreOutputSelectors.size)
 
     val select = split.select("a")
-    val sink = select.print
+    val sink = select.print()
     val splitEdge =
       env.getStreamGraph.getStreamEdge(unionFilter.getId, sink.getTransformation.getId)
     assert("a" == splitEdge.getSelectedNames.get(0))
@@ -457,44 +438,44 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
   }
 
   @Test
-  def testChannelSelectors {
+  def testChannelSelectors() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     val src = env.generateSequence(0, 0)
 
     val broadcast = src.broadcast
-    val broadcastSink = broadcast.print
+    val broadcastSink = broadcast.print()
     val broadcastPartitioner = env.getStreamGraph
       .getStreamEdge(src.getId, broadcastSink.getTransformation.getId).getPartitioner
     assert(broadcastPartitioner.isInstanceOf[BroadcastPartitioner[_]])
 
     val shuffle: DataStream[Long] = src.shuffle
-    val shuffleSink = shuffle.print
+    val shuffleSink = shuffle.print()
     val shufflePartitioner = env.getStreamGraph
       .getStreamEdge(src.getId, shuffleSink.getTransformation.getId).getPartitioner
     assert(shufflePartitioner.isInstanceOf[ShufflePartitioner[_]])
 
     val forward: DataStream[Long] = src.forward
-    val forwardSink = forward.print
+    val forwardSink = forward.print()
     val forwardPartitioner = env.getStreamGraph
       .getStreamEdge(src.getId, forwardSink.getTransformation.getId).getPartitioner
     assert(forwardPartitioner.isInstanceOf[ForwardPartitioner[_]])
 
     val rebalance: DataStream[Long] = src.rebalance
-    val rebalanceSink = rebalance.print
+    val rebalanceSink = rebalance.print()
     val rebalancePartitioner = env.getStreamGraph
       .getStreamEdge(src.getId, rebalanceSink.getTransformation.getId).getPartitioner
     assert(rebalancePartitioner.isInstanceOf[RebalancePartitioner[_]])
 
     val global: DataStream[Long] = src.global
-    val globalSink = global.print
+    val globalSink = global.print()
     val globalPartitioner = env.getStreamGraph
       .getStreamEdge(src.getId, globalSink.getTransformation.getId).getPartitioner
     assert(globalPartitioner.isInstanceOf[GlobalPartitioner[_]])
   }
 
   @Test
-  def testIterations {
+  def testIterations() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     // we need to rebalance before iteration
     val source = env.fromElements(1, 2, 3).map { t: Int => t }
@@ -512,10 +493,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
         val head = input.partitionByHash(1, 1).map(i => (i + 1).toString, s => s)
         (head.filter(_ == "2"), head.filter(_ != "2"))
       }, 1000).print()
-      fail
+      fail()
     } catch {
       case uoe: UnsupportedOperationException =>
-      case e: Exception => fail
+      case e: Exception => fail()
     }
 
     val sg = env.getStreamGraph
@@ -531,7 +512,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     dataStream.print()
     val operator = getOperatorForDataStream(dataStream)
       .asInstanceOf[AbstractUdfStreamOperator[_, _]]
-    return operator.getUserFunction.asInstanceOf[Function]
+    operator.getUserFunction.asInstanceOf[Function]
   }
 
   private def getOperatorForDataStream(dataStream: DataStream[_]): StreamOperator[_] = {
@@ -542,15 +523,15 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
   }
 
   private def isPartitioned(edge: StreamEdge): Boolean = {
-    return edge.getPartitioner.isInstanceOf[HashPartitioner[_]]
+    edge.getPartitioner.isInstanceOf[HashPartitioner[_]]
   }
 
   private def isCustomPartitioned(edge: StreamEdge): Boolean = {
-    return edge.getPartitioner.isInstanceOf[CustomPartitionerWrapper[_, _]]
+    edge.getPartitioner.isInstanceOf[CustomPartitionerWrapper[_, _]]
   }
 
   private def createDownStreamId(dataStream: DataStream[_]): Integer = {
-    return dataStream.print.getTransformation.getId
+    dataStream.print().getTransformation.getId
   }
 
   private def createDownStreamId(dataStream: ConnectedStreams[_, _]): Integer = {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
index 650fd7e..7904bcb 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
@@ -17,8 +17,9 @@
  */
 package org.apache.flink.streaming.api.scala
 
+import java.util
+
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import java.util.HashSet
 
 /**
  * Test programs for stateful functions.
@@ -30,11 +31,13 @@ object StateTestPrograms {
     
     // test stateful map
     env.generateSequence(0, 10).setParallelism(1)
+      .keyBy(x => x)
       .mapWithState((in, count: Option[Long]) =>
         count match {
-          case Some(c) => ((in - c), Some(c + 1))
+          case Some(c) => (in - c, Some(c + 1))
           case None => (in, Some(1L))
         }).setParallelism(1)
+      
       .addSink(new RichSinkFunction[Long]() {
         var allZero = true
         override def invoke(in: Long) = {
@@ -46,13 +49,17 @@ object StateTestPrograms {
       })
 
     // test stateful flatmap
-    env.fromElements("Fir st-", "Hello world").flatMapWithState((w, s: Option[String]) =>
-      s match {
-        case Some(s) => (w.split(" ").toList.map(s + _), Some(w))
-        case None => (List(w), Some(w))
-      }).setParallelism(1)
+    env.fromElements("Fir st-", "Hello world")
+      .keyBy(x => x)
+      .flatMapWithState((w, s: Option[String]) =>
+        s match {
+          case Some(state) => (w.split(" ").toList.map(state + _), Some(w))
+          case None => (List(w), Some(w))
+        })
+      .setParallelism(1)
+      
       .addSink(new RichSinkFunction[String]() {
-        val received = new HashSet[String]()
+        val received = new util.HashSet[String]()
         override def invoke(in: String) = { received.add(in) }
         override def close() = {
           assert(received.size() == 3)