You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/17 14:14:52 UTC

[4/6] flink git commit: [FLINK-2462] [streaming] Major cleanup of operator structure for exception handling and code simplication

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 8c354be..2ca2862 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -45,28 +45,32 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  * Implementation of the {@link RuntimeContext}, created by runtime stream UDF
  * operators.
  */
-@SuppressWarnings("rawtypes")
 public class StreamingRuntimeContext extends RuntimeUDFContext {
 
 	private final Environment env;
-	private final Map<String, StreamOperatorState> states;
-	private final List<PartitionedStreamOperatorState> partitionedStates;
+	private final Map<String, StreamOperatorState<?, ?>> states;
+	private final List<PartitionedStreamOperatorState<?, ?, ?>> partitionedStates;
 	private final KeySelector<?, ?> statePartitioner;
 	private final StateHandleProvider<Serializable> provider;
-	private final ClassLoader cl;
-
+	
+	
 	@SuppressWarnings("unchecked")
-	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
-			ExecutionConfig executionConfig, KeySelector<?, ?> statePartitioner,
-			StateHandleProvider<?> provider, Map<String, Accumulator<?, ?>> accumulatorMap) {
-		super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
-				executionConfig, env.getDistributedCacheEntries(), accumulatorMap);
+	public StreamingRuntimeContext(
+			Environment env,
+			ExecutionConfig executionConfig,
+			KeySelector<?, ?> statePartitioner,
+			StateHandleProvider<?> provider,
+			Map<String, Accumulator<?, ?>> accumulatorMap) {
+		
+		super(env.getTaskName(), env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
+				env.getUserClassLoader(), executionConfig,
+				env.getDistributedCacheEntries(), accumulatorMap);
+		
 		this.env = env;
 		this.statePartitioner = statePartitioner;
-		this.states = new HashMap<String, StreamOperatorState>();
-		this.partitionedStates = new LinkedList<PartitionedStreamOperatorState>();
+		this.states = new HashMap<>();
+		this.partitionedStates = new LinkedList<>();
 		this.provider = (StateHandleProvider<Serializable>) provider;
-		this.cl = userCodeClassLoader;
 	}
 
 	/**
@@ -121,14 +125,14 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 
 	public StreamOperatorState<?, ?> getState(String name, boolean partitioned) {
 		// Try fetching state from the map
-		StreamOperatorState state = states.get(name);
+		StreamOperatorState<?, ?> state = states.get(name);
 		if (state == null) {
 			// If not found, create empty state and add to the map
 			state = createRawState(partitioned);
 			states.put(name, state);
 			// We keep a reference to all partitioned states for registering input
 			if (state instanceof PartitionedStreamOperatorState) {
-				partitionedStates.add((PartitionedStreamOperatorState) state);
+				partitionedStates.add((PartitionedStreamOperatorState<?, ?, ?>) state);
 			}
 		}
 		return state;
@@ -139,11 +143,11 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	 * 
 	 * @return An empty operator state.
 	 */
-	@SuppressWarnings("unchecked")
-	public StreamOperatorState createRawState(boolean partitioned) {
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public StreamOperatorState<?, ?> createRawState(boolean partitioned) {
 		if (partitioned) {
 			if (statePartitioner != null) {
-				return new PartitionedStreamOperatorState(provider, statePartitioner, cl);
+				return new PartitionedStreamOperatorState(provider, statePartitioner, getUserCodeClassLoader());
 			} else {
 				throw new RuntimeException(
 						"Partitioned state can only be used with KeyedDataStreams.");
@@ -158,7 +162,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	 * 
 	 * @return All the states for the underlying operator.
 	 */
-	public Map<String, StreamOperatorState> getOperatorStates() {
+	public Map<String, StreamOperatorState<?, ?>> getOperatorStates() {
 		return states;
 	}
 
@@ -169,7 +173,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	 * @param nextRecord
 	 *            Next input of the operator.
 	 */
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	public void setNextInput(StreamRecord<?> nextRecord) {
 		if (statePartitioner != null) {
 			for (PartitionedStreamOperatorState state : partitionedStates) {

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 8cf5a40..5d0497d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -27,114 +26,65 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class);
-
 	private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
+	
+	private volatile boolean running = true;
 
 	@Override
-	public void registerInputOutput() {
-		try {
-			super.registerInputOutput();
+	public void init() throws Exception {
+		TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+		TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
 	
-			TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
-			TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
+		int numberOfInputs = configuration.getNumberOfInputs();
 	
-			int numberOfInputs = configuration.getNumberOfInputs();
+		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
+		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
 	
-			ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
-			ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
+		List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
 	
-			List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-	
-			for (int i = 0; i < numberOfInputs; i++) {
-				int inputType = inEdges.get(i).getTypeNumber();
-				InputGate reader = getEnvironment().getInputGate(i);
-				switch (inputType) {
-					case 1:
-						inputList1.add(reader);
-						break;
-					case 2:
-						inputList2.add(reader);
-						break;
-					default:
-						throw new RuntimeException("Invalid input type number: " + inputType);
-				}
+		for (int i = 0; i < numberOfInputs; i++) {
+			int inputType = inEdges.get(i).getTypeNumber();
+			InputGate reader = getEnvironment().getInputGate(i);
+			switch (inputType) {
+				case 1:
+					inputList1.add(reader);
+					break;
+				case 2:
+					inputList2.add(reader);
+					break;
+				default:
+					throw new RuntimeException("Invalid input type number: " + inputType);
 			}
-	
-			this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
-					inputDeserializer1, inputDeserializer2,
-					getCheckpointBarrierListener(),
-					configuration.getCheckpointMode(),
-					getEnvironment().getIOManager(),
-					getExecutionConfig().areTimestampsEnabled());
-
-			// make sure that stream tasks report their I/O statistics
-			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
-			AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-			this.inputProcessor.setReporter(reporter);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
 		}
+	
+		this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
+				inputDeserializer1, inputDeserializer2,
+				getCheckpointBarrierListener(),
+				configuration.getCheckpointMode(),
+				getEnvironment().getIOManager(),
+				getExecutionConfig().areTimestampsEnabled());
+
+		// make sure that stream tasks report their I/O statistics
+		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+		AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+		this.inputProcessor.setReporter(reporter);
 	}
 
 	@Override
-	public void invoke() throws Exception {
-		boolean operatorOpen = false;
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Task {} invoked", getName());
-		}
-
-		try {
-
-			openOperator();
-			operatorOpen = true;
-
-			while (inputProcessor.processInput(streamOperator)) {
-				// do nothing, just keep processing
-			}
-
-			closeOperator();
-			operatorOpen = false;
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Task {} invocation finished", getName());
-			}
-
-		}
-		catch (Exception e) {
-			LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e);
-
-			if (operatorOpen) {
-				try {
-					closeOperator();
-				}
-				catch (Throwable t) {
-					LOG.warn("Exception while closing operator.", t);
-				}
-			}
-
-			throw e;
-		}
-		finally {
-			this.isRunning = false;
-			// Cleanup
-			outputHandler.flushOutputs();
-			clearBuffers();
-		}
-
+	protected void run() throws Exception {
+		while (running && inputProcessor.processInput(streamOperator));
 	}
 
 	@Override
-	public void clearBuffers() throws IOException {
-		super.clearBuffers();
-		inputProcessor.clearBuffers();
+	protected void cleanup() throws Exception {
 		inputProcessor.cleanup();
 	}
+
+	@Override
+	protected void cancelTask() {
+		running = false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 6ca38b7..60db798 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -141,9 +141,12 @@ public class StatefulOperatorTest {
 			KeySelector<Integer, Serializable> partitioner, byte[] serializedState) throws Exception {
 		final List<String> outputList = output;
 
-		StreamingRuntimeContext context = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024,
-				new MockInputSplitProvider(), 1024), null, new ExecutionConfig(), partitioner,
-				new LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>());
+		StreamingRuntimeContext context = new StreamingRuntimeContext(
+				new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024), 
+				new ExecutionConfig(),
+				partitioner,
+				new LocalStateHandleProvider<Serializable>(),
+				new HashMap<String, Accumulator<?, ?>>());
 
 		StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper());
 
@@ -217,14 +220,15 @@ public class StatefulOperatorTest {
 			}
 		}
 		
-		@SuppressWarnings({ "rawtypes", "unchecked" })
+		@SuppressWarnings("unchecked")
 		@Override
 		public void close() throws Exception {
-			Map<String, StreamOperatorState> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
+			Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
 			PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
 			for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
 				Integer key = (Integer) count.getKey();
 				Integer expected = key < 3 ? 2 : 1;
+				
 				assertEquals(new MutableInt(expected), count.getValue());
 			}
 		}
@@ -257,11 +261,12 @@ public class StatefulOperatorTest {
 			groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true);
 		}
 		
-		@SuppressWarnings({ "rawtypes", "unchecked" })
+		@SuppressWarnings("unchecked")
 		@Override
 		public void close() throws Exception {
-			Map<String, StreamOperatorState> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
-			PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
+			Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
+			PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = 
+					(PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
 			for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
 				Integer key = (Integer) count.getKey();
 				Integer expected = key < 3 ? 2 : 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index f07e3a5..7cc1958 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -85,12 +85,7 @@ public class StreamRecordWriterTest {
 		}
 		finally {
 			if (testWriter != null) {
-				try {
-					testWriter.close();
-				}
-				catch (IOException e) {
-					// ignore in tests
-				}
+				testWriter.close();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 296324a..4c6957b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -168,7 +168,6 @@ public class OneInputStreamTaskTest {
 	 * This test verifies that checkpoint barriers are correctly forwarded.
 	 */
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testCheckpointBarriers() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
@@ -226,7 +225,6 @@ public class OneInputStreamTaskTest {
 	 * then all inputs receive barriers from a later checkpoint.
 	 */
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testOvertakingCheckpointBarriers() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
@@ -282,7 +280,6 @@ public class OneInputStreamTaskTest {
 
 		testHarness.waitForInputProcessing();
 
-
 		testHarness.endInput();
 
 		testHarness.waitForTaskCompletion();

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 4f07fdb..7fb8ba3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -62,7 +62,6 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
 	 */
 	public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
 			int numInputGates,
-
 			int numInputChannelsPerGate,
 			TypeInformation<IN> inputType,
 			TypeInformation<OUT> outputType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 0f372cb..987d0bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -83,7 +83,7 @@ public class StreamTaskTestHarness<OUT> {
 
 	private ConcurrentLinkedQueue<Object> outputList;
 
-	protected Thread taskThread;
+	protected TaskThread taskThread;
 
 	// These don't get initialized, the one-input/two-input specific test harnesses
 	// must initialize these if they want to simulate input. We have them here so that all the
@@ -161,32 +161,19 @@ public class StreamTaskTestHarness<OUT> {
 
 		task.registerInputOutput();
 
-		taskThread = new Thread(new Runnable() {
-			@Override
-			public void run() {
-
-
-
-				try {
-					task.invoke();
-					shutdownIOManager();
-					shutdownMemoryManager();
-				} catch (Exception e) {
-					throw new RuntimeException(e);
-				}
-
-			}
-		});
-
+		taskThread = new TaskThread(task);
 		taskThread.start();
 	}
 
-	public void waitForTaskCompletion() throws InterruptedException {
+	public void waitForTaskCompletion() throws Exception {
 		if (taskThread == null) {
 			throw new IllegalStateException("Task thread was not started.");
 		}
 
 		taskThread.join();
+		if (taskThread.getError() != null) {
+			throw new Exception("error in task", taskThread.getError());
+		}
 	}
 
 	/**
@@ -300,5 +287,36 @@ public class StreamTaskTestHarness<OUT> {
 			inputGates[i].endInput();
 		}
 	}
+	
+	// ------------------------------------------------------------------------
+	
+	private class TaskThread extends Thread {
+		
+		private final AbstractInvokable task;
+		
+		private volatile Throwable error;
+
+
+		TaskThread(AbstractInvokable task) {
+			super("Task Thread");
+			this.task = task;
+		}
+
+		@Override
+		public void run() {
+			try {
+				task.invoke();
+				shutdownIOManager();
+				shutdownMemoryManager();
+			}
+			catch (Throwable t) {
+				this.error = t;
+			}
+		}
+
+		public Throwable getError() {
+			return error;
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 45ae88f..f9b0b09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -57,9 +57,11 @@ public class MockContext<IN, OUT> {
 
 	public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) {
 		MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
-		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask",
-				new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
-				new ExecutionConfig(), null, null, new HashMap<String, Accumulator<?, ?>>());
+		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(
+				new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
+				new ExecutionConfig(), 
+				null, null, 
+				new HashMap<String, Accumulator<?, ?>>());
 
 		operator.setup(mockContext.output, runtimeContext);
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 6652fde..f404d01 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -60,9 +60,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		executionConfig = new ExecutionConfig();
 
 		StreamingRuntimeContext runtimeContext =  new StreamingRuntimeContext(
-				"MockTwoInputTask",
-				new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
-				getClass().getClassLoader(),
+				new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
 				executionConfig,
 				null,
 				new LocalStateHandle.LocalStateHandleProvider<Serializable>(),

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 2d7f6b5..711dd41 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -42,8 +42,13 @@ public class SourceFunctionUtil<T> {
 	public static <T> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
 		List<T> outputs = new ArrayList<T>();
 		if (sourceFunction instanceof RichFunction) {
-			RuntimeContext runtimeContext =  new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
-					new ExecutionConfig(), null, new LocalStateHandle.LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>());
+			RuntimeContext runtimeContext =  new StreamingRuntimeContext(
+					new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
+					new ExecutionConfig(), 
+					null, 
+					new LocalStateHandle.LocalStateHandleProvider<Serializable>(),
+					new HashMap<String, Accumulator<?, ?>>());
+			
 			((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
 
 			((RichFunction) sourceFunction).open(new Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index a9ebd0b..09db1f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -66,7 +66,7 @@ public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
 	// ------------------------------------------------------------------------
 	
 	public StreamingMultipleProgramsTestBase() {
-		TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
+		TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, DEFAULT_PARALLELISM);
 		clusterEnv.setAsContext();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 1e8b5c6..21e2e1e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -33,7 +34,6 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 import java.io.Serializable;
 import java.util.HashMap;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -60,9 +60,7 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
 		executionConfig = new ExecutionConfig();
 
 		StreamingRuntimeContext runtimeContext =  new StreamingRuntimeContext(
-				"MockTwoInputTask",
-				new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
-				getClass().getClassLoader(),
+				new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
 				new ExecutionConfig(),
 				null,
 				new LocalStateHandle.LocalStateHandleProvider<Serializable>(),