You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/05 01:59:54 UTC

[05/10] flink git commit: [FLINK-5141] [runtime] Add 'waitUntilTaskManagerRegistrationsComplete()' to MiniCluster

[FLINK-5141] [runtime] Add 'waitUntilTaskManagerRegistrationsComplete()' to MiniCluster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82c1fcfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82c1fcfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82c1fcfa

Branch: refs/heads/flip-6
Commit: 82c1fcfa1f34b963f45146830d51b1490b0dc1e3
Parents: c0086b5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 17:35:47 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../leaderelection/LeaderAddressAndId.java      | 73 +++++++++++++++++
 .../flink/runtime/minicluster/MiniCluster.java  | 58 ++++++++++++-
 .../minicluster/MiniClusterJobDispatcher.java   |  2 +-
 .../OneTimeLeaderListenerFuture.java            | 60 ++++++++++++++
 .../resourcemanager/ResourceManager.java        | 11 +++
 .../resourcemanager/ResourceManagerGateway.java |  8 ++
 .../runtime/minicluster/MiniClusterITCase.java  |  8 ++
 .../Flip6LocalStreamEnvironment.java            | 23 +++---
 .../LocalStreamEnvironmentITCase.java           | 81 +++++++++++++++++++
 .../flink/core/testutils/CheckedThread.java     | 85 ++++++++++++++++++++
 10 files changed, 392 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
new file mode 100644
index 0000000..23cd34b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A combination of a leader address and leader id.
+ */
+public class LeaderAddressAndId {
+
+	private final String leaderAddress;
+	private final UUID leaderId;
+
+	public LeaderAddressAndId(String leaderAddress, UUID leaderId) {
+		this.leaderAddress = checkNotNull(leaderAddress);
+		this.leaderId = checkNotNull(leaderId);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public String leaderAddress() {
+		return leaderAddress;
+	}
+
+	public UUID leaderId() {
+		return leaderId;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return 31 * leaderAddress.hashCode()+ leaderId.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o != null && o.getClass() == LeaderAddressAndId.class) {
+			final LeaderAddressAndId that = (LeaderAddressAndId) o;
+			return this.leaderAddress.equals(that.leaderAddress) && this.leaderId.equals(that.leaderId);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "LeaderAddressAndId (" + leaderAddress + " / " + leaderId + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3ede5b5..1b9f265 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -27,11 +27,15 @@ import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -169,6 +173,7 @@ public class MiniCluster {
 			final boolean singleRpc = config.getUseSingleRpcSystem();
 
 			try {
+				LOG.info("Starting Metrics Registry");
 				metricRegistry = createMetricRegistry(configuration);
 
 				RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
@@ -176,10 +181,12 @@ public class MiniCluster {
 				RpcService[] resourceManagerRpcServices = new RpcService[numResourceManagers];
 
 				// bring up all the RPC services
-				if (singleRpc) {
-					// one common RPC for all
-					commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+				LOG.info("Starting RPC Service(s)");
+
+				// we always need the 'commonRpcService' for auxiliary calls
+				commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
 
+				if (singleRpc) {
 					// set that same RPC service for all JobManagers and TaskManagers
 					for (int i = 0; i < numJobManagers; i++) {
 						jobManagerRpcServices[i] = commonRpcService;
@@ -236,7 +243,7 @@ public class MiniCluster {
 						configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
 
 				// bring up the dispatcher that launches JobManagers when jobs submitted
-				LOG.info("Starting job dispatcher for {} JobManger(s)", numJobManagers);
+				LOG.info("Starting job dispatcher(s) for {} JobManger(s)", numJobManagers);
 				jobDispatcher = new MiniClusterJobDispatcher(
 						configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
 			}
@@ -357,6 +364,49 @@ public class MiniCluster {
 		}
 	}
 
+	public void waitUntilTaskManagerRegistrationsComplete() throws Exception {
+		LeaderRetrievalService rmMasterListener = null;
+		Future<LeaderAddressAndId> addressAndIdFuture;
+
+		try {
+			synchronized (lock) {
+				checkState(running, "FlinkMiniCluster is not running");
+
+				OneTimeLeaderListenerFuture listenerFuture = new OneTimeLeaderListenerFuture();
+				rmMasterListener = haServices.getResourceManagerLeaderRetriever();
+				rmMasterListener.start(listenerFuture);
+				addressAndIdFuture = listenerFuture.future(); 
+			}
+
+			final LeaderAddressAndId addressAndId = addressAndIdFuture.get();
+
+			final ResourceManagerGateway resourceManager = 
+					commonRpcService.connect(addressAndId.leaderAddress(), ResourceManagerGateway.class).get();
+
+			final int numTaskManagersToWaitFor = taskManagerRunners.length;
+
+			// poll and wait until enough TaskManagers are available
+			while (true) {
+				int numTaskManagersAvailable = 
+						resourceManager.getNumberOfRegisteredTaskManagers(addressAndId.leaderId()).get();
+
+				if (numTaskManagersAvailable >= numTaskManagersToWaitFor) {
+					break;
+				}
+				Thread.sleep(2);
+			}
+		}
+		finally {
+			try {
+				if (rmMasterListener != null) {
+					rmMasterListener.stop();
+				}
+			} catch (Exception e) {
+				LOG.warn("Error shutting down leader listener for ResourceManager");
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  running jobs
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 8ac8eba..7fffaee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -143,7 +143,7 @@ public class MiniClusterJobDispatcher {
 			if (!shutdown) {
 				shutdown = true;
 
-				LOG.info("Shutting down the dispatcher");
+				LOG.info("Shutting down the job dispatcher");
 
 				// in this shutdown code we copy the references to the stack first,
 				// to avoid concurrent modification

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
new file mode 100644
index 0000000..b0157d8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
+import java.util.UUID;
+
+/**
+ * A leader listener that exposes a future for the first leader notification.  
+ * 
+ * <p>The future can be obtained via the {@link #future()} method.
+ */
+public class OneTimeLeaderListenerFuture implements LeaderRetrievalListener {
+
+	private final FlinkCompletableFuture<LeaderAddressAndId> future;
+
+	public OneTimeLeaderListenerFuture() {
+		this.future = new FlinkCompletableFuture<>();
+	}
+
+	/**
+	 * Gets the future that is completed with the leader address and ID. 
+	 * @return The future.
+	 */
+	public FlinkFuture<LeaderAddressAndId> future() {
+		return future;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+		future.complete(new LeaderAddressAndId(leaderAddress, leaderSessionID));
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		future.completeExceptionally(exception);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 145cc40..76b4a86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -502,6 +503,16 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		shutDownApplication(finalStatus, optionalDiagnostics);
 	}
 
+	@RpcMethod
+	public Integer getNumberOfRegisteredTaskManagers(UUID leaderSessionId) throws LeaderIdMismatchException {
+		if (this.leaderSessionId != null && this.leaderSessionId.equals(leaderSessionId)) {
+			return taskExecutors.size();
+		}
+		else {
+			throw new LeaderIdMismatchException(this.leaderSessionId, leaderSessionId);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Testing methods
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 0a37bb9..8235ea7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -122,4 +122,12 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param optionalDiagnostics
 	 */
 	void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics);
+
+	/**
+	 * Gets the currently registered number of TaskManagers.
+	 * 
+	 * @param leaderSessionId The leader session ID with which to address the ResourceManager.
+	 * @return The future to the number of registered TaskManagers.
+	 */
+	Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index 2cf2d4d..d9a1896 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -33,6 +33,10 @@ import org.junit.Test;
  */
 public class MiniClusterITCase extends TestLogger {
 
+	// ------------------------------------------------------------------------
+	//  Simple Job Running Tests
+	// ------------------------------------------------------------------------
+
 	@Test
 	public void runJobWithSingleRpcService() throws Exception {
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
@@ -63,6 +67,10 @@ public class MiniClusterITCase extends TestLogger {
 		executeJob(miniCluster);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
 	private static void executeJob(MiniCluster miniCluster) throws Exception {
 		miniCluster.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index a0c128e..2007d35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -30,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,8 +67,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 					"The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +
 							"or running in a TestEnvironment context.");
 		}
-		
+
 		this.conf = config == null ? new Configuration() : config;
+		setParallelism(1);
 	}
 
 	/**
@@ -85,17 +86,12 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 		StreamGraph streamGraph = getStreamGraph();
 		streamGraph.setJobName(jobName);
 
-		JobGraph jobGraph = streamGraph.getJobGraph();
+		// TODO - temp fix to enforce restarts due to a bug in the allocation protocol
+		streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 5));
 
+		JobGraph jobGraph = streamGraph.getJobGraph();
 		jobGraph.setAllowQueuedScheduling(true);
 
-		// As jira FLINK-5140 described,
-		// we have to set restart strategy to handle NoResourceAvailableException.
-		ExecutionConfig executionConfig = new ExecutionConfig();
-		executionConfig.setRestartStrategy(
-			RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
-		jobGraph.setExecutionConfig(executionConfig);
-
 		Configuration configuration = new Configuration();
 		configuration.addAll(jobGraph.getJobConfiguration());
 		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
@@ -105,7 +101,8 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration);
 
-		// Currently we do not reuse slot anymore, so we need to sum all parallelism of vertices up.
+		// Currently we do not reuse slot anymore,
+		// so we need to sum up the parallelism of all vertices
 		int slotsCount = 0;
 		for (JobVertex jobVertex : jobGraph.getVertices()) {
 			slotsCount += jobVertex.getParallelism();
@@ -119,8 +116,10 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 		MiniCluster miniCluster = new MiniCluster(cfg);
 		try {
 			miniCluster.start();
+			miniCluster.waitUntilTaskManagerRegistrationsComplete();
 			return miniCluster.runJobBlocking(jobGraph);
-		} finally {
+		}
+		finally {
 			transformations.clear();
 			miniCluster.shutdown();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
new file mode 100644
index 0000000..a360d0e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class LocalStreamEnvironmentITCase {
+
+	/**
+	 * Test test verifies that the execution environment can be used to execute a
+	 * single job with multiple slots.
+	 */
+	@Test
+	public void testRunIsolatedJob() throws Exception {
+		Flip6LocalStreamEnvironment env = new Flip6LocalStreamEnvironment();
+		assertEquals(1, env.getParallelism());
+
+		addSmallBoundedJob(env, 3);
+		env.execute();
+	}
+
+	/**
+	 * Test test verifies that the execution environment can be used to execute multiple
+	 * bounded streaming jobs after one another.
+	 */
+	@Test
+	public void testMultipleJobsAfterAnother() throws Exception {
+		Flip6LocalStreamEnvironment env = new Flip6LocalStreamEnvironment();
+
+		addSmallBoundedJob(env, 3);
+		env.execute();
+
+		addSmallBoundedJob(env, 5);
+		env.execute();
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
+		DataStream<Long> stream = env
+				.generateSequence(1, 100)
+					.setParallelism(parallelism)
+					.slotSharingGroup("group_1");
+
+		stream
+				.filter(new FilterFunction<Long>() {
+					@Override
+					public boolean filter(Long value) {
+						return false;
+					}
+				})
+					.setParallelism(parallelism)
+					.startNewChain()
+					.slotSharingGroup("group_2")
+
+				.print()
+					.setParallelism(parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
new file mode 100644
index 0000000..aedbb5c
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+/**
+ * A thread that additionally catches exceptions and offers a joining method that
+ * re-throws the exceptions.
+ * 
+ * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one
+ * needs to extends this class and implement the {@link #go()} method. That method may
+ * throw exceptions.
+ * 
+ * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this
+ * thread via the {@link #sync()} method.
+ */
+public abstract class CheckedThread extends Thread {
+
+	/** The error thrown from the main work method */
+	private volatile Throwable error;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method needs to be overwritten to contain the main work logic.
+	 * It takes the role of {@link Thread#run()}, but should propagate exceptions.
+	 * 
+	 * @throws Exception The exceptions thrown here will be re-thrown in the {@link #sync()} method.
+	 */
+	public abstract void go() throws Exception;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method is final - thread work should go into the {@link #go()} method instead.
+	 */
+	@Override
+	public final void run() {
+		try {
+			go();
+		}
+		catch (Throwable t) {
+			error = t;
+		}
+	}
+
+	/**
+	 * Waits until the thread is completed and checks whether any error occurred during
+	 * the execution.
+	 * 
+	 * <p>This method blocks like {@link #join()}, but performs an additional check for
+	 * exceptions thrown from the {@link #go()} method.
+	 */
+	public void sync() throws Exception {
+		super.join();
+
+		// propagate the error
+		if (error != null) {
+			if (error instanceof Error) {
+				throw (Error) error;
+			}
+			else if (error instanceof Exception) {
+				throw (Exception) error;
+			}
+			else {
+				throw new Exception(error.getMessage(), error);
+			}
+		}
+	}
+}