You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:58 UTC

[62/82] [abbrv] incubator-flink git commit: Fixed JobManagerITCase to properly wait for task managers to deregister their tasks. Replaced the scheduler's execution service with akka's futures. Introduced TestStreamEnvironment to use ForkableFlinkMiniClus

Fixed JobManagerITCase to properly wait for task managers to deregister their tasks. Replaced the scheduler's execution service with akka's futures. Introduced TestStreamEnvironment to use ForkableFlinkMiniCluster for test execution.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c175ebe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c175ebe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c175ebe8

Branch: refs/heads/master
Commit: c175ebe84f39c208d0f5ae6e09d45a76869ee86e
Parents: bd4ee47
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 17 13:02:16 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:31 2014 +0100

----------------------------------------------------------------------
 .../api/avro/AvroExternalJarProgramITCase.java  |   6 +-
 .../flink-streaming-core/pom.xml                |   7 +
 .../api/environment/LocalStreamEnvironment.java |   5 -
 .../environment/StreamExecutionEnvironment.java |   2 +-
 .../flink/streaming/util/ClusterUtil.java       |   1 -
 .../apache/flink/streaming/api/IterateTest.java |   5 +-
 .../apache/flink/streaming/api/PrintTest.java   |   7 +-
 .../streaming/api/WindowCrossJoinTest.java      |   6 +-
 .../flink/streaming/api/WriteAsCsvTest.java     |   6 +-
 .../flink/streaming/api/WriteAsTextTest.java    |   6 +-
 .../api/collector/DirectedOutputTest.java       |   6 +-
 .../api/streamvertex/StreamVertexTest.java      |  11 +-
 .../streaming/util/TestStreamEnvironment.java   |  72 +++
 .../org/apache/flink/client/program/Client.java |   2 +-
 .../flink/configuration/ConfigConstants.java    |   2 +-
 .../examples/scala/graph/DeltaPageRank.scala    |   2 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |  34 +-
 .../apache/flink/runtime/client/JobClient.scala |  12 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   1 -
 .../runtime/minicluster/FlinkMiniCluster.scala  |  73 +--
 .../minicluster/LocalFlinkMiniCluster.scala     | 107 +++-
 .../flink/runtime/taskmanager/TaskManager.scala |  16 +-
 .../ExecutionVertexCancelTest.java              |   5 +-
 .../ExecutionVertexDeploymentTest.java          |  10 +-
 .../ScheduleWithCoLocationHintTest.java         |  19 +
 .../scheduler/SchedulerIsolatedTasksTest.java   |  29 +-
 .../scheduler/SchedulerSlotSharingTest.java     |  32 +-
 .../ExecutionGraphRestartTest.scala             |   1 -
 .../TaskManagerLossFailsTasksTest.scala         |   1 -
 .../runtime/jobmanager/JobManagerITCase.scala   | 623 ++++++++-----------
 .../runtime/testingUtils/TestingCluster.scala   |   2 +-
 .../testingUtils/TestingJobManager.scala        |  25 +-
 .../TestingJobManagerMessages.scala             |   1 +
 .../testingUtils/TestingTaskManager.scala       |  33 +-
 .../TestingTaskManagerMessages.scala            |   4 +
 flink-scala/pom.xml                             |   4 +-
 .../api/scala/typeutils/EitherSerializer.scala  |   2 +-
 .../api/scala/typeutils/EitherTypeInfo.scala    |   2 +-
 .../api/scala/typeutils/NothingSerializer.scala |   2 +-
 .../api/scala/typeutils/OptionSerializer.scala  |   2 +-
 .../api/scala/typeutils/OptionTypeInfo.scala    |   2 +-
 .../scala/typeutils/TraversableSerializer.scala |   2 +-
 .../scala/typeutils/TraversableTypeInfo.scala   |   2 +-
 flink-test-utils/pom.xml                        | 134 ++++
 .../flink/test/util/AbstractTestBase.java       |   8 +-
 .../flink/test/util/JavaProgramTestBase.java    |   5 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |  77 +++
 .../test/cancelling/CancellingTestBase.java     |   5 +-
 .../PackagedProgramEndToEndITCase.java          |   6 +-
 .../apache/flink/test/util/FailingTestBase.java |   7 +-
 .../scala/runtime/ScalaSpecialTypesITCase.scala |   2 +-
 .../ScalaSpecialTypesSerializerTest.scala       |   2 +-
 .../runtime/TraversableSerializerTest.scala     |   2 +-
 53 files changed, 878 insertions(+), 562 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index b637e79..78a6683 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -39,12 +39,12 @@ public class AvroExternalJarProgramITCase {
 	@Test
 	public void testExternalProgram() {
 		
-		LocalFlinkMiniCluster testMiniCluster = null;
+		ForkableFlinkMiniCluster testMiniCluster = null;
 		
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
-			testMiniCluster = new LocalFlinkMiniCluster(config);
+			testMiniCluster = new ForkableFlinkMiniCluster(config);
 			
 			String jarFile = JAR_FILE;
 			String testData = getClass().getResource(TEST_DATA_FILE).toString();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/pom.xml b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
index b2bbccc..b476670 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
@@ -46,6 +46,13 @@ under the License.
 			<artifactId>commons-math</artifactId>
 			<version>2.2</version>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 505f0e7..3e44012 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -44,9 +44,4 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
 				getDegreeOfParallelism());
 	}
-
-	public void executeTest(long memorySize) throws Exception {
-		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism(),
-				memorySize);
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 783fa28..d26b714 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -90,7 +90,7 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Gets the degree of parallelism with which operation are executed by
 	 * default. Operations can individually override this value to use a
-	 * specific degree of parallelism via {@link DataStream#setParallelism}.
+	 * specific degree of parallelism.
 	 * 
 	 * @return The degree of parallelism used by operations, unless they
 	 *         override that value.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index bf5ba73..a7b7137 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -81,5 +81,4 @@ public class ClusterUtil {
 	public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception {
 		runOnMiniCluster(jobGraph, numOfSlots, -1);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 5d910ba..7804e66 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -73,7 +74,7 @@ public class IterateTest {
 
 	@Test
 	public void test() throws Exception {
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
 
 		env.setBufferTimeout(10);
 
@@ -86,7 +87,7 @@ public class IterateTest {
 
 		iteration.closeWith(increment).addSink(new MySink());
 
-		env.executeTest(MEMORYSIZE);
+		env.execute();
 
 		assertTrue(iterated);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index d1d5b1e..757f6f6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -21,8 +21,8 @@ import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
 public class PrintTest implements Serializable {
@@ -50,9 +50,8 @@ public class PrintTest implements Serializable {
 
 	@Test
 	public void test() throws Exception {
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
 		env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
-		env.executeTest(MEMORYSIZE);
-
+		env.execute();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index d8cdfa5..37f8c0a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
 public class WindowCrossJoinTest implements Serializable {
@@ -45,7 +45,7 @@ public class WindowCrossJoinTest implements Serializable {
 
 	@Test
 	public void test() throws Exception {
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
 		env.setBufferTimeout(1);
 
 		ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
@@ -111,7 +111,7 @@ public class WindowCrossJoinTest implements Serializable {
 				})
 				.addSink(new CrossResultSink());
 
-		env.executeTest(MEMORYSIZE);
+		env.execute();
 
 		assertEquals(joinExpectedResults, joinResults);
 		assertEquals(crossExpectedResults, crossResults);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index 6b1dd5a..edd3ed5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -28,9 +28,9 @@ import java.util.List;
 
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -132,7 +132,7 @@ public class WriteAsCsvTest {
 	
 	@Test
 	public void test() throws Exception {
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
 
 		@SuppressWarnings("unused")
 		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test1.txt");
@@ -159,7 +159,7 @@ public class WriteAsCsvTest {
 
 		fillExpected5();
 
-		env.executeTest(MEMORYSIZE);
+		env.execute();
 
 		readFile(PREFIX + "test1.txt", result1);
 		readFile(PREFIX + "test2.txt", result2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index e21f21d..1cad8a6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -28,9 +28,9 @@ import java.util.List;
 
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -132,7 +132,7 @@ public class WriteAsTextTest {
 
 	@Test
 	public void test() throws Exception {
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
 		
 		@SuppressWarnings("unused")
 		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsText(PREFIX + "test1.txt");
@@ -159,7 +159,7 @@ public class WriteAsTextTest {
 
 		fillExpected5();
 
-		env.executeTest(MEMORYSIZE);
+		env.execute();
 
 		readFile(PREFIX + "test1.txt", result1);
 		readFile(PREFIX + "test2.txt", result2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 4ab1be2..ad51a6c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
 public class DirectedOutputTest {
@@ -104,8 +105,7 @@ public class DirectedOutputTest {
 
 	@Test
 	public void outputSelectorTest() throws Exception {
-
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = new TestStreamEnvironment(1, 128);
 
 		SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
 		source.select(EVEN).addSink(new ListSink(EVEN));
@@ -113,7 +113,7 @@ public class DirectedOutputTest {
 		source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
 		source.selectAll().addSink(new ListSink(ALL));
 
-		env.executeTest(128);
+		env.execute();
 		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
 		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
 		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index 9edf44e..b103d84 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -145,8 +146,7 @@ public class StreamVertexTest {
 
 	@Test
 	public void coTest() throws Exception {
-		LocalStreamEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(SOURCE_PARALELISM);
+		StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
 
 		DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
 		DataStream<Long> generatedSequence = env.generateSequence(0, 3);
@@ -154,7 +154,7 @@ public class StreamVertexTest {
 		fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
 
 		resultSet = new HashSet<String>();
-		env.executeTest(MEMORYSIZE);
+		env.execute();
 
 		HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
 				"2", "3"));
@@ -163,12 +163,11 @@ public class StreamVertexTest {
 
 	@Test
 	public void runStream() throws Exception {
-		LocalStreamEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(SOURCE_PARALELISM);
+		StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
 
 		env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
 
-		env.executeTest(MEMORYSIZE);
+		env.execute();
 		assertEquals(10, data.keySet().size());
 
 		for (Integer k : data.keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
new file mode 100644
index 0000000..3918013
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import akka.actor.ActorRef;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+public class TestStreamEnvironment extends StreamExecutionEnvironment {
+	private static final String DEFAULT_JOBNAME = "TestStreamingJob";
+	private static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job";
+
+	private long memorySize;
+
+	public TestStreamEnvironment(int degreeOfParallelism, long memorySize){
+		this.setDegreeOfParallelism(degreeOfParallelism);
+
+		this.memorySize = memorySize;
+	}
+
+	@Override
+	public void execute() throws Exception {
+		execute(DEFAULT_JOBNAME);
+	}
+
+	@Override
+	public void execute(String jobName) throws Exception {
+		JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
+
+		Configuration configuration = jobGraph.getJobConfiguration();
+
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
+				getDegreeOfParallelism());
+		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
+
+		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
+
+		try{
+			ActorRef client = cluster.getJobClient();
+			JobClient.submitJobAndWait(jobGraph, false, client);
+		}catch(JobExecutionException e){
+			if(e.getMessage().contains("GraphConversionException")){
+				throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
+			}else{
+				throw e;
+			}
+		}finally{
+			cluster.stop();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 203f294..45848c2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -68,7 +68,7 @@ public class Client {
 	
 	private final PactCompiler compiler;		// the compiler to compile the jobs
 	
-	private boolean printStatusDuringExecution;
+	private boolean printStatusDuringExecution = false;
 	
 	// ------------------------------------------------------------------------
 	//                            Construction

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 047cfd1..f0ab180 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -598,7 +598,7 @@ public final class ConfigConstants {
 
 	public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
 
-	public static String DEFAULT_AKKA_LOG_LEVEL = "OFF";
+	public static String DEFAULT_AKKA_LOG_LEVEL = "ERROR";
 
 	public static int DEFAULT_AKKA_ASK_TIMEOUT = 100;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
index ae1c6cf..c6eb643 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 7807b73..6495aba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -25,9 +25,11 @@ import java.util.Iterator;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import akka.dispatch.Futures;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -48,9 +50,6 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	
 	private final Object globalLock = new Object();
 	
-	private final ExecutorService executor;
-	
-	
 	/** All instances that the scheduler can deploy to */
 	private final Set<Instance> allInstances = new HashSet<Instance>();
 	
@@ -69,13 +68,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	
 	private int nonLocalizedAssignments;
 	
-	
 	public Scheduler() {
-		this(null);
-	}
-	
-	public Scheduler(ExecutorService executorService) {
-		this.executor = executorService;
 		this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
 	}
 	
@@ -395,19 +388,14 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		// that leads with a high probability to deadlocks, when scheduling fast
 		
 		this.newlyAvailableInstances.add(instance);
-		
-		if (this.executor != null) {
-			this.executor.execute(new Runnable() {
-				@Override
-				public void run() {
-					handleNewSlot();
-				}
-			});
-		}
-		else {
-			// for tests, we use the synchronous variant
-			handleNewSlot();
-		}
+
+		Futures.future(new Callable<Object>() {
+			@Override
+			public Object call() throws Exception {
+				handleNewSlot();
+				return null;
+			}
+		}, AkkaUtils.globalExecutionContext());
 	}
 	
 	private void handleNewSlot() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 0d9f672..53f0daa 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.client
 
 import java.io.IOException
 import java.net.InetSocketAddress
+import java.util.concurrent.TimeUnit
 
 import akka.actor.Status.Failure
 import akka.actor._
@@ -83,11 +84,13 @@ object JobClient{
   def startActorSystemAndActor(config: Configuration): (ActorSystem, ActorRef) = {
     implicit val actorSystem = AkkaUtils.createActorSystem(host = "localhost",
       port =0, configuration = config)
+
     (actorSystem, startActorWithConfiguration(config))
   }
 
-  def startActor(jobManagerURL: String)(implicit actorSystem: ActorSystem): ActorRef = {
-    actorSystem.actorOf(Props(classOf[JobClient], jobManagerURL), JOB_CLIENT_NAME)
+  def startActor(jobManagerURL: String)(implicit actorSystem: ActorSystem, timeout: FiniteDuration):
+  ActorRef = {
+    actorSystem.actorOf(Props(classOf[JobClient], jobManagerURL, timeout), JOB_CLIENT_NAME)
   }
 
   def parseConfiguration(configuration: Configuration): String = {
@@ -109,7 +112,10 @@ object JobClient{
   }
 
   def startActorWithConfiguration(config: Configuration)(implicit actorSystem: ActorSystem):
-  ActorRef= {
+  ActorRef = {
+    implicit val timeout = FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+
     startActor(parseConfiguration(config))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 822a34c..a72c685 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -421,7 +421,6 @@ object JobManager {
     }
 
     jobManagerSystem.awaitTermination()
-    println("Shutting down.")
   }
 
   def parseArgs(args: Array[String]): (String, Int, Configuration, ExecutionMode) = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 43be786..1c5a9df 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -22,12 +22,9 @@ import java.util.concurrent.TimeUnit
 
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
-import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.EnvironmentInformation
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.FiniteDuration
@@ -36,6 +33,8 @@ import scala.concurrent.{Future, Await}
 abstract class FlinkMiniCluster(userConfiguration: Configuration) {
   import FlinkMiniCluster._
 
+  val HOSTNAME = "localhost"
+
   implicit val timeout = FiniteDuration(userConfiguration.getInteger(ConfigConstants
     .AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
 
@@ -54,8 +53,6 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
 
   val (taskManagerActorSystems, taskManagerActors) = actorSystemsTaskManagers.unzip
 
-  val jobClientActorSystem = AkkaUtils.createActorSystem()
-
   waitForTaskManagersToBeRegistered()
 
   def generateConfiguration(userConfiguration: Configuration): Configuration
@@ -79,21 +76,6 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
       configuration)
   }
 
-  def getJobClient(): ActorRef ={
-    val config = new Configuration()
-
-    config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, FlinkMiniCluster.HOSTNAME)
-    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
-
-    JobClient.startActorWithConfiguration(config)(jobClientActorSystem)
-  }
-
-  def getJobClientActorSystem: ActorSystem = jobClientActorSystem
-
-  def getJobManagerRPCPort: Int = {
-    configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1)
-  }
-
   def getJobManager: ActorRef = {
     jobManagerActor
   }
@@ -116,13 +98,11 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
   def shutdown(): Unit = {
     taskManagerActorSystems foreach { _.shutdown() }
     jobManagerActorSystem.shutdown()
-    jobClientActorSystem.shutdown()
   }
 
   def awaitTermination(): Unit = {
-    jobClientActorSystem.awaitTermination()
-    taskManagerActorSystems foreach { _.awaitTermination()}
     jobManagerActorSystem.awaitTermination()
+    taskManagerActorSystems foreach { _.awaitTermination()}
   }
 
   def waitForTaskManagersToBeRegistered(): Unit = {
@@ -138,51 +118,4 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
 
 object FlinkMiniCluster{
   val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
-  val HOSTNAME = "localhost"
-
-  def initializeIOFormatClasses(configuration: Configuration): Unit = {
-    try{
-      val om = classOf[FileOutputFormat[_]].getDeclaredMethod("initDefaultsFromConfiguration",
-        classOf[Configuration])
-      om.setAccessible(true)
-      om.invoke(null, configuration)
-    }catch {
-      case e: Exception =>
-        LOG.error("Cannot (re) initialize the globally loaded defaults. Some classes might not " +
-          "follow the specified default behaviour.")
-    }
-  }
-
-  def getDefaultConfig: Configuration = {
-    val config: Configuration = new Configuration
-    config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
-    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
-      .DEFAULT_JOB_MANAGER_IPC_PORT)
-    config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
-      .DEFAULT_TASK_MANAGER_IPC_PORT)
-    config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
-      .DEFAULT_TASK_MANAGER_DATA_PORT)
-    config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, ConfigConstants
-      .DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION)
-    config.setInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants
-      .DEFAULT_JOBCLIENT_POLLING_INTERVAL)
-    config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, ConfigConstants
-      .DEFAULT_FILESYSTEM_OVERWRITE)
-    config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
-      ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY)
-    var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
-    val bufferMem: Long = ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS *
-      ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
-    val numTaskManager = 1
-    val taskManagerNumSlots: Int = ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS
-    memorySize = memorySize - (bufferMem * numTaskManager)
-    memorySize = (memorySize * ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION).toLong
-    memorySize >>>= 20
-    memorySize /= numTaskManager
-    config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
-    config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager)
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots)
-    config
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index edf16bb..32fdb37 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -19,41 +19,29 @@
 package org.apache.flink.runtime.minicluster
 
 import akka.actor.{ActorRef, ActorSystem}
+import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.util.EnvironmentInformation
 import org.slf4j.LoggerFactory
 
 class LocalFlinkMiniCluster(userConfiguration: Configuration) extends
 FlinkMiniCluster(userConfiguration){
+  import LocalFlinkMiniCluster._
 
-  override def generateConfiguration(userConfiguration: Configuration): Configuration = {
-    val forNumberString = System.getProperty("forkNumber")
-
-    val forkNumber = try {
-      Integer.parseInt(forNumberString)
-    }catch{
-      case e: NumberFormatException => -1
-    }
+  val jobClientActorSystem = AkkaUtils.createActorSystem()
 
-    val config = FlinkMiniCluster.getDefaultConfig
+  override def generateConfiguration(userConfiguration: Configuration): Configuration = {
+    val config = getDefaultConfig
 
     config.addAll(userConfiguration)
 
-    if(forkNumber != -1){
-      val jobManagerRPC = 1024 + forkNumber*300
-      val taskManagerRPC = 1024 + forkNumber*300 + 100
-      val taskManagerData = 1024 + forkNumber*300 + 200
+    setMemory(config)
 
-      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
-      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
-      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
-
-    }
-
-    FlinkMiniCluster.initializeIOFormatClasses(config)
+    initializeIOFormatClasses(config)
 
     config
   }
@@ -80,7 +68,84 @@ FlinkMiniCluster(userConfiguration){
       config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
     }
 
-    TaskManager.startActorWithConfiguration(FlinkMiniCluster.HOSTNAME, config, false)(system)
+    TaskManager.startActorWithConfiguration(HOSTNAME, config, false)(system)
+  }
+
+  def getJobClient(): ActorRef ={
+    val config = new Configuration()
+
+    config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
+    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
+
+
+    JobClient.startActorWithConfiguration(config)(jobClientActorSystem)
+  }
+
+  def getJobClientActorSystem: ActorSystem = jobClientActorSystem
+
+  def getJobManagerRPCPort: Int = {
+    configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1)
+  }
+
+  override def shutdown(): Unit = {
+    super.shutdown()
+    jobClientActorSystem.shutdown()
+  }
+
+  override def awaitTermination(): Unit = {
+    jobClientActorSystem.awaitTermination()
+    super.awaitTermination()
+  }
+
+  def initializeIOFormatClasses(configuration: Configuration): Unit = {
+    try{
+      val om = classOf[FileOutputFormat[_]].getDeclaredMethod("initDefaultsFromConfiguration",
+        classOf[Configuration])
+      om.setAccessible(true)
+      om.invoke(null, configuration)
+    }catch {
+      case e: Exception =>
+        LOG.error("Cannot (re) initialize the globally loaded defaults. Some classes might not " +
+          "follow the specified default behaviour.")
+    }
+  }
+
+  def setMemory(config: Configuration): Unit = {
+    var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
+    val bufferMem: Long = ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS *
+      ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
+    val numTaskManager = config.getInteger(ConfigConstants
+      .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+    val taskManagerNumSlots: Int = ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS
+    memorySize = memorySize - (bufferMem * numTaskManager)
+    memorySize = (memorySize * ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION).toLong
+    memorySize >>>= 20
+    memorySize /= numTaskManager
+    config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
+  }
+
+  def getDefaultConfig: Configuration = {
+    val config: Configuration = new Configuration
+    config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
+    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
+      .DEFAULT_JOB_MANAGER_IPC_PORT)
+    config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
+      .DEFAULT_TASK_MANAGER_IPC_PORT)
+    config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
+      .DEFAULT_TASK_MANAGER_DATA_PORT)
+    config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, ConfigConstants
+      .DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION)
+    config.setInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants
+      .DEFAULT_JOBCLIENT_POLLING_INTERVAL)
+    config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, ConfigConstants
+      .DEFAULT_FILESYSTEM_OVERWRITE)
+    config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
+      ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY)
+    config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1)
+    config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, ConfigConstants
+      .DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS)
+    config
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7004881..9360a14 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -290,7 +290,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
       } catch {
         case t: Throwable =>
           log.error(t, s"Could not instantiate task with execution ID ${executionID}.")
-
           runningTasks.remove(executionID)
 
           for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
@@ -336,9 +335,11 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
       log.info(s"Unregister task with execution ID ${executionID}.")
       runningTasks.remove(executionID) match {
         case Some(task) =>
-          for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
-            .getJobConfiguration)) {
-            fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
+          if(task.getEnvironment != null) {
+            for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
+              .getJobConfiguration)) {
+              fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
+            }
           }
 
           channelManager foreach {
@@ -377,6 +378,10 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
 
     val receiver = this.self
 
+    val taskName = runningTasks(executionID).getTaskName
+    val numberOfSubtasks = runningTasks(executionID).getNumberOfSubtasks
+    val indexOfSubtask = runningTasks(executionID).getSubtaskIndex
+
     futureResponse.mapTo[Boolean].onComplete {
       case Success(result) =>
         if (!result || executionState == ExecutionState.FINISHED || executionState ==
@@ -384,7 +389,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
           receiver ! UnregisterTask(executionID)
         }
       case Failure(t) =>
-        log.error(t, "Execution state change notification failed.")
+        log.error(t, s"Execution state change notification failed for task ${executionID} " +
+          s"($indexOfSubtask/$numberOfSubtasks) of job ${jobID}.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 689d10a..207be1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -135,8 +135,9 @@ public class ExecutionVertexCancelTest {
 				setVertexState(vertex, ExecutionState.SCHEDULED);
 				assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
 
-				ActorRef taskManager = system.actorOf(Props.create(new CancelSequenceTaskManagerCreator(new
-						TaskOperationResult(execId, true), new TaskOperationResult(execId, false))));
+				ActorRef taskManager = TestActorRef.create(system, Props.create(new
+						CancelSequenceTaskManagerCreator(new TaskOperationResult(execId, true),
+						new TaskOperationResult(execId, false))));
 
 				Instance instance = getInstance(taskManager);
 				AllocatedSlot slot = instance.allocateSlot(new JobID());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 4b3bbbf..e5c41b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -55,9 +55,13 @@ public class ExecutionVertexDeploymentTest {
 	public void testDeployCall() {
 		try {
 			final JobVertexID jid = new JobVertexID();
+
+			TestingUtils.setCallingThreadDispatcher(system);
+			ActorRef tm = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager
+					.class));
 			
 			// mock taskmanager to simply accept the call
-			Instance instance = getInstance(ActorRef.noSender());
+			Instance instance = getInstance(tm);
 
 			final AllocatedSlot slot = instance.allocateSlot(new JobID());
 			
@@ -67,7 +71,7 @@ public class ExecutionVertexDeploymentTest {
 			
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			vertex.deployToSlot(slot);
-			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 			
 			// no repeated scheduling
 			try {
@@ -84,6 +88,8 @@ public class ExecutionVertexDeploymentTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
+		}finally{
+			TestingUtils.setGlobalExecutionContext();
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index 5234992..b22ccd0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -26,13 +26,32 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ScheduleWithCoLocationHintTest {
 
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup(){
+		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+		TestingUtils.setCallingThreadDispatcher(system);
+	}
+
+	@AfterClass
+	public static void teardown(){
+		TestingUtils.setGlobalExecutionContext();
+		JavaTestKit.shutdownActorSystem(system);
+	}
+
 	@Test
 	public void scheduleAllSharedAndCoLocated() {
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index ad040f7..240cdac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -24,6 +24,11 @@ import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.g
 import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
 import static org.junit.Assert.*;
 
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -46,6 +51,19 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
  * Tests for the {@link Scheduler} when scheduling individual tasks.
  */
 public class SchedulerIsolatedTasksTest {
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup(){
+		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+		TestingUtils.setCallingThreadDispatcher(system);
+	}
+
+	@AfterClass
+	public static void teardown(){
+		TestingUtils.setGlobalExecutionContext();
+		JavaTestKit.shutdownActorSystem(system);
+	}
 	
 	@Test
 	public void testAddAndRemoveInstance() {
@@ -182,13 +200,13 @@ public class SchedulerIsolatedTasksTest {
 		final int NUM_INSTANCES = 50;
 		final int NUM_SLOTS_PER_INSTANCE = 3;
 		final int NUM_TASKS_TO_SCHEDULE = 2000;
-		
-		final ExecutorService executor = Executors.newFixedThreadPool(4, ExecutorThreadFactory.INSTANCE);
+
+		TestingUtils.setGlobalExecutionContext();
 		
 		try {
 			// note: since this test asynchronously releases slots, the executor needs release workers.
 			// doing the release call synchronous can lead to a deadlock
-			Scheduler scheduler = new Scheduler(executor);
+			Scheduler scheduler = new Scheduler();
 			
 			for (int i = 0;i < NUM_INSTANCES; i++) {
 				scheduler.newInstanceAvailable(getRandomInstance((int) (Math.random() * NUM_SLOTS_PER_INSTANCE) + 1));
@@ -274,9 +292,8 @@ public class SchedulerIsolatedTasksTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		}
-		finally {
-			executor.shutdownNow();
+		}finally{
+			TestingUtils.setCallingThreadDispatcher(system);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index ea32065..de90701 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -31,6 +31,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
@@ -40,6 +45,20 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
  * Tests for the scheduler when scheduling tasks in slot sharing groups.
  */
 public class SchedulerSlotSharingTest {
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup(){
+		system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
+		TestingUtils.setCallingThreadDispatcher(system);
+	}
+
+	@AfterClass
+	public static void teardown(){
+		TestingUtils.setGlobalExecutionContext();
+		JavaTestKit.shutdownActorSystem(system);
+	}
+
 	
 	@Test
 	public void scheduleSingleVertexType() {
@@ -776,7 +795,7 @@ public class SchedulerSlotSharingTest {
 	
 	@Test
 	public void testSequentialAllocateAndRelease() {
-		final ExecutorService exec = Executors.newFixedThreadPool(8);
+		TestingUtils.setGlobalExecutionContext();
 		try {
 			final JobVertexID jid1 = new JobVertexID();
 			final JobVertexID jid2 = new JobVertexID();
@@ -785,7 +804,7 @@ public class SchedulerSlotSharingTest {
 			
 			final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
 			
-			final Scheduler scheduler = new Scheduler(exec);
+			final Scheduler scheduler = new Scheduler();
 			scheduler.newInstanceAvailable(getRandomInstance(4));
 			
 			// allocate something from group 1 and 2 interleaved with schedule for group 3
@@ -834,15 +853,15 @@ public class SchedulerSlotSharingTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
-		}
-		finally {
-			exec.shutdownNow();
+		}finally{
+			TestingUtils.setCallingThreadDispatcher(system);
 		}
 	}
 	
 	@Test
 	public void testConcurrentAllocateAndRelease() {
 		final ExecutorService executor = Executors.newFixedThreadPool(20);
+		TestingUtils.setGlobalExecutionContext();
 		try {
 			for (int run = 0; run < 50; run++) {
 				final JobVertexID jid1 = new JobVertexID();
@@ -852,7 +871,7 @@ public class SchedulerSlotSharingTest {
 				
 				final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
 				
-				final Scheduler scheduler = new Scheduler(executor);
+				final Scheduler scheduler = new Scheduler();
 				scheduler.newInstanceAvailable(getRandomInstance(4));
 				
 				final AtomicInteger enumerator1 = new AtomicInteger();
@@ -1012,6 +1031,7 @@ public class SchedulerSlotSharingTest {
 		}
 		finally {
 			executor.shutdownNow();
+			TestingUtils.setCallingThreadDispatcher(system);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index f4ce7b0..c0c052a 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph
 import akka.actor.{Props, ActorSystem}
 import akka.testkit.{TestKit}
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, AbstractJobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c175ebe8/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 9884bca..97625d9 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph
 import akka.actor.{Props, ActorSystem}
 import akka.testkit.TestKit
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionGraphTestUtils}
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
 .SimpleAcknowledgingTaskManager
 import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, AbstractJobVertex}