You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/13 18:23:31 UTC

[1/5] flink git commit: [FLINK-1878] [APIs] Environments accept a flag that controls sysout logging during execution.

Repository: flink
Updated Branches:
  refs/heads/master f81d9f022 -> 69a400fad


[FLINK-1878] [APIs] Environments accept a flag that controls sysout logging during execution.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e79813bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e79813bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e79813bf

Branch: refs/heads/master
Commit: e79813bfdb9044dd6b533abf8345249e8ceba04f
Parents: 0e6c9d4
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 13 16:23:40 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 13 16:38:21 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/LocalExecutor.java  | 28 +++-------
 .../org/apache/flink/client/RemoteExecutor.java | 54 +++++++++++++-------
 .../flink/api/common/ExecutionConfig.java       | 34 +++++++++++-
 .../apache/flink/api/common/PlanExecutor.java   | 43 +++++++++++++---
 .../apache/flink/api/java/LocalEnvironment.java |  1 +
 .../flink/api/java/RemoteEnvironment.java       |  1 +
 .../jar/CustomInputSplitProgram.java            |  1 +
 .../test/classloading/jar/KMeansForTest.java    |  1 +
 .../test/classloading/jar/StreamingProgram.java |  3 +-
 .../ProcessFailureBatchRecoveryITCase.java      |  1 +
 .../ProcessFailureStreamingRecoveryITCase.java  |  1 +
 11 files changed, 119 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e79813bf/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index d327d6f..07a3a8e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -55,12 +55,9 @@ public class LocalExecutor extends PlanExecutor {
 
 	// ---------------------------------- config options ------------------------------------------
 	
-
 	private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 
 	private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
-
-	private boolean printStatusDuringExecution = true;
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -85,12 +82,12 @@ public class LocalExecutor extends PlanExecutor {
 		this.defaultOverwriteFiles = defaultOverwriteFiles;
 	}
 	
-	public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
-
-	public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; }
+	public void setTaskManagerNumSlots(int taskManagerNumSlots) {
+		this.taskManagerNumSlots = taskManagerNumSlots; 
+	}
 
-	public void setPrintStatusDuringExecution(boolean printStatus) {
-		this.printStatusDuringExecution = printStatus;
+	public int getTaskManagerNumSlots() {
+		return this.taskManagerNumSlots;
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -178,7 +175,8 @@ public class LocalExecutor extends PlanExecutor {
 				JobGraphGenerator jgg = new JobGraphGenerator();
 				JobGraph jobGraph = jgg.compileJobGraph(op);
 				
-				SerializedJobExecutionResult result = flink.submitJobAndWait(jobGraph, printStatusDuringExecution);
+				boolean sysoutPrint = isPrintingStatusDuringExecution();
+				SerializedJobExecutionResult result = flink.submitJobAndWait(jobGraph,sysoutPrint);
 				return result.toJobExecutionResult(ClassLoader.getSystemClassLoader());
 			}
 			finally {
@@ -276,16 +274,4 @@ public class LocalExecutor extends PlanExecutor {
 		List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(plan);
 		return gen.getPactPlanAsJSON(sinks);
 	}
-
-	/**
-	 * By default, local environments do not overwrite existing files.
-	 * 
-	 * NOTE: This method must be called prior to initializing the LocalExecutor or a 
-	 * {@link org.apache.flink.api.java.LocalEnvironment}.
-	 * 
-	 * @param overwriteByDefault True to overwrite by default, false to not overwrite by default.
-	 */
-	public static void setOverwriteFilesByDefault(boolean overwriteByDefault) {
-		DEFAULT_OVERWRITE = overwriteByDefault;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e79813bf/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 1759b65..a523169 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client;
 
 import java.io.File;
@@ -39,7 +38,18 @@ import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program
+ * and ships it to a remote Flink cluster for execution.
+ * 
+ * The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
+ * set of libraries that need to be shipped together with the program.
+ * 
+ * The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
+ * remotely execute program parts.
+ */
 public class RemoteExecutor extends PlanExecutor {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);
 
 	private final List<String> jarFiles;
@@ -65,22 +75,6 @@ public class RemoteExecutor extends PlanExecutor {
 		this.jarFiles = jarFiles;
 		this.address = inet;
 	}
-	
-	public static InetSocketAddress getInetFromHostport(String hostport) {
-		// from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
-		URI uri;
-		try {
-			uri = new URI("my://" + hostport);
-		} catch (URISyntaxException e) {
-			throw new RuntimeException("Could not identify hostname and port", e);
-		}
-		String host = uri.getHost();
-		int port = uri.getPort();
-		if (host == null || port == -1) {
-			throw new RuntimeException("Could not identify hostname and port");
-		}
-		return new InetSocketAddress(host, port);
-	}
 
 	@Override
 	public JobExecutionResult executePlan(Plan plan) throws Exception {
@@ -90,8 +84,10 @@ public class RemoteExecutor extends PlanExecutor {
 	
 	public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
 		Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader(), -1);
+		c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
+		
 		JobSubmissionResult result = c.run(p, -1, true);
-		if(result instanceof JobExecutionResult) {
+		if (result instanceof JobExecutionResult) {
 			return (JobExecutionResult) result;
 		} else {
 			LOG.warn("The Client didn't return a JobExecutionResult");
@@ -104,6 +100,8 @@ public class RemoteExecutor extends PlanExecutor {
 		PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
 		
 		Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader(), -1);
+		c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
+		
 		JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
 		if(result instanceof JobExecutionResult) {
 			return (JobExecutionResult) result;
@@ -122,4 +120,24 @@ public class RemoteExecutor extends PlanExecutor {
 		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
 		return jsonGen.getOptimizerPlanAsJSON(op);
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	//   Utilities
+	// --------------------------------------------------------------------------------------------
+	public static InetSocketAddress getInetFromHostport(String hostport) {
+		// from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
+		URI uri;
+		try {
+			uri = new URI("my://" + hostport);
+		} catch (URISyntaxException e) {
+			throw new RuntimeException("Could not identify hostname and port", e);
+		}
+		String host = uri.getHost();
+		int port = uri.getPort();
+		if (host == null || port == -1) {
+			throw new RuntimeException("Could not identify hostname and port");
+		}
+		return new InetSocketAddress(host, port);
+	}
+	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e79813bf/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 1990a2f..3809329 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -76,6 +76,9 @@ public class ExecutionConfig implements Serializable {
 
 	private boolean serializeGenericTypesWithAvro = false;
 
+	/** If set to true, progress updates are printed to System.out during execution */
+	private boolean printProgressDuringExecution = true;
+
 	// Serializers and types registered with Kryo and the PojoSerializer
 	// we store them in lists to ensure they are registered in order in all kryo instances.
 
@@ -281,8 +284,6 @@ public class ExecutionConfig implements Serializable {
 		return serializeGenericTypesWithAvro;
 	}
 
-
-
 	/**
 	 * Enables reusing objects that Flink internally uses for deserialization and passing
 	 * data to user-code functions. Keep in mind that this can lead to bugs when the
@@ -309,6 +310,35 @@ public class ExecutionConfig implements Serializable {
 		return objectReuse;
 	}
 
+	/**
+	 * Enables the printing of progress update messages to {@code System.out}
+	 * 
+	 * @return The ExecutionConfig object, to allow for function chaining.
+	 */
+	public ExecutionConfig enableSysoutLogging() {
+		this.printProgressDuringExecution = true;
+		return this;
+	}
+
+	/**
+	 * Disables the printing of progress update messages to {@code System.out}
+	 *
+	 * @return The ExecutionConfig object, to allow for function chaining.
+	 */
+	public ExecutionConfig disableSysoutLogging() {
+		this.printProgressDuringExecution = false;
+		return this;
+	}
+
+	/**
+	 * Gets whether progress update messages should be printed to {@code System.out}
+	 * 
+	 * @return True, if progress update messages should be printed, false otherwise.
+	 */
+	public boolean isSysoutLoggingEnabled() {
+		return this.printProgressDuringExecution;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Registry for types and serializers
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e79813bf/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 9eaddd1..74bdb09 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -27,13 +27,34 @@ import java.util.List;
 
 /**
  * A PlanExecutor runs a plan. The specific implementation (such as the org.apache.flink.client.LocalExecutor
- * and org.apache.flink.client.RemoteExecutor) determines where and how to run the plan. 
+ * and org.apache.flink.client.RemoteExecutor) determines where and how to run the plan.
+ * 
+ * The concrete implementations are loaded dynamically, because they depend on the full set of
+ * dependencies of all runtime classes.
  */
 public abstract class PlanExecutor {
-
+	
 	private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";
 	private static final String REMOTE_EXECUTOR_CLASS = "org.apache.flink.client.RemoteExecutor";
+
+	// ------------------------------------------------------------------------
+	//  Config Options
+	// ------------------------------------------------------------------------
 	
+	/** If true, all execution progress updates are not only logged, but also printed to System.out */
+	private boolean printUpdatesToSysout = true;
+	
+	public void setPrintStatusDuringExecution(boolean printStatus) {
+		this.printUpdatesToSysout = printStatus;
+	}
+	
+	public boolean isPrintingStatusDuringExecution() {
+		return this.printUpdatesToSysout;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Program Execution
+	// ------------------------------------------------------------------------
 	
 	/**
 	 * Execute the given plan and return the runtime in milliseconds.
@@ -55,7 +76,11 @@ public abstract class PlanExecutor {
 	 * @throws Exception Thrown, if the executor could not connect to the compiler.
 	 */
 	public abstract String getOptimizerPlanAsJSON(Plan plan) throws Exception;
-	
+
+
+	// ------------------------------------------------------------------------
+	//  Executor Factories
+	// ------------------------------------------------------------------------
 	
 	/**
 	 * Creates an executor that runs the plan locally in a multi-threaded environment.
@@ -69,7 +94,8 @@ public abstract class PlanExecutor {
 			return leClass.getConstructor(Configuration.class).newInstance(configuration);
 		}
 		catch (Throwable t) {
-			throw new RuntimeException("An error occurred while loading the local executor (" + LOCAL_EXECUTOR_CLASS + ").", t);
+			throw new RuntimeException("An error occurred while loading the local executor ("
+					+ LOCAL_EXECUTOR_CLASS + ").", t);
 		}
 	}
 
@@ -93,13 +119,15 @@ public abstract class PlanExecutor {
 		
 		Class<? extends PlanExecutor> reClass = loadExecutorClass(REMOTE_EXECUTOR_CLASS);
 		
-		List<String> files = (jarFiles == null || jarFiles.length == 0) ? Collections.<String>emptyList() : Arrays.asList(jarFiles); 
+		List<String> files = (jarFiles == null || jarFiles.length == 0) ? Collections.<String>emptyList()
+																		: Arrays.asList(jarFiles); 
 		
 		try {
 			return reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files);
 		}
 		catch (Throwable t) {
-			throw new RuntimeException("An error occurred while loading the remote executor (" + REMOTE_EXECUTOR_CLASS + ").", t);
+			throw new RuntimeException("An error occurred while loading the remote executor ("
+					+ REMOTE_EXECUTOR_CLASS + ").", t);
 		}
 	}
 	
@@ -109,7 +137,8 @@ public abstract class PlanExecutor {
 			return leClass.asSubclass(PlanExecutor.class);
 		}
 		catch (ClassNotFoundException cnfe) {
-			throw new RuntimeException("Could not load the executor class (" + className + "). Do you have the 'flink-clients' project in your dependencies?");
+			throw new RuntimeException("Could not load the executor class (" + className
+					+ "). Do you have the 'flink-clients' project in your dependencies?");
 		}
 		catch (Throwable t) {
 			throw new RuntimeException("An error occurred while loading the executor (" + className + ").", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/e79813bf/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 33bebf6..25042b6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -50,6 +50,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
 		Plan p = createProgramPlan(jobName);
 		
 		PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
+		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
 		return executor.executePlan(p);
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/e79813bf/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index df18bbf..c9a4fe0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -66,6 +66,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		Plan p = createProgramPlan(jobName);
 		
 		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
+		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
 		return executor.executePlan(p);
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/e79813bf/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
index b18d7e7..e251f8d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
@@ -51,6 +51,7 @@ public class CustomInputSplitProgram {
 		
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
 		env.setParallelism(parallelism);
+		env.getConfig().disableSysoutLogging();
 
 		DataSet<Integer> data = env.createInput(new CustomInputFormat());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e79813bf/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
index 794efbd..785464a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/KMeansForTest.java
@@ -68,6 +68,7 @@ public class KMeansForTest {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
 		env.setParallelism(parallelism);
+		env.getConfig().disableSysoutLogging();
 
 		// get input data
 		DataSet<Point> points = env.fromElements(pointsData.split("\n"))

http://git-wip-us.apache.org/repos/asf/flink/blob/e79813bf/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index 9a244a4..f02a5d6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -37,7 +37,8 @@ public class StreamingProgram {
 		final int port = Integer.parseInt(args[2]);
 		
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
-
+		env.getConfig().disableSysoutLogging();
+		
 		DataStream<String> text = env.fromElements(WordCountData.TEXT);
 
 		DataStream<Word> counts =

http://git-wip-us.apache.org/repos/asf/flink/blob/e79813bf/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
index cdee8ce..f2b8c31 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -67,6 +67,7 @@ public class ProcessFailureBatchRecoveryITCase extends AbstractProcessFailureRec
 		env.setParallelism(PARALLELISM);
 		env.setNumberOfExecutionRetries(1);
 		env.getConfig().setExecutionMode(executionMode);
+		env.getConfig().disableSysoutLogging();
 
 		final long NUM_ELEMENTS = 100000L;
 		final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS)

http://git-wip-us.apache.org/repos/asf/flink/blob/e79813bf/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index 82b9d6a..ea311c4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -72,6 +72,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 									.createRemoteEnvironment("localhost", jobManagerPort);
 		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
 		env.setNumberOfExecutionRetries(1);
 		env.enableCheckpointing(200);
 


[2/5] flink git commit: [FLINK-1868] [tests] Fix corner case of EnvironmentInformationTest

Posted by se...@apache.org.
[FLINK-1868] [tests] Fix corner case of EnvironmentInformationTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0e6c9d4b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0e6c9d4b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0e6c9d4b

Branch: refs/heads/master
Commit: 0e6c9d4b53b4b87ec6940e62672baf73f97a8283
Parents: ad63707
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 13 12:52:58 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 13 16:38:21 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/util/EnvironmentInformationTest.java    | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0e6c9d4b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
index 8da7b14..764cb57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
@@ -34,10 +34,12 @@ public class EnvironmentInformationTest {
 			long freeWithGC = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
 			
 			assertTrue(fullHeap > 0);
-			assertTrue(free > 0);
-			assertTrue(freeWithGC > 0);
-			assertTrue(free <= fullHeap);
-			assertTrue(freeWithGC <= fullHeap);
+			assertTrue(free >= 0);
+			assertTrue(freeWithGC >= 0);
+			
+			// we cannot make these assumptions, because the test JVM may grow / shrink during the GC
+			// assertTrue(free <= fullHeap);
+			// assertTrue(freeWithGC <= fullHeap);
 		}
 		catch (Exception e) {
 			e.printStackTrace();


[4/5] flink git commit: [FLINK-1879] [client] Simplify JobClient. Hide actorRefs behind method calls where possible.

Posted by se...@apache.org.
[FLINK-1879] [client] Simplify JobClient. Hide actorRefs behind method calls where possible.

 - Drop redundant routing actor
 - Consistently set the flag to subscribe to updates or not.
 - Scala style cleanups: Drop default values for some method parameters.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad637077
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad637077
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad637077

Branch: refs/heads/master
Commit: ad6370772ff863003dc7247dc2123e10ea1c590b
Parents: f81d9f0
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 13 12:50:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 13 16:38:21 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/LocalExecutor.java  |  11 +-
 .../org/apache/flink/client/program/Client.java |  50 ++--
 .../apache/flink/client/program/ClientTest.java |  10 +-
 .../apache/flink/runtime/client/JobClient.java  | 256 ++++++++++++++++
 .../flink/runtime/client/JobClientActor.java    | 150 ++++++++++
 .../apache/flink/runtime/client/JobClient.scala | 294 -------------------
 .../runtime/messages/JobClientMessages.scala    |  28 +-
 .../runtime/messages/JobManagerMessages.scala   |   4 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  33 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     |  20 +-
 .../SlotCountExceedingParallelismTest.java      |  14 +-
 .../ScheduleOrUpdateConsumersTest.java          |  12 +-
 .../jobmanager/CoLocationConstraintITCase.scala |   2 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  28 +-
 .../runtime/jobmanager/RecoveryITCase.scala     |   6 +-
 .../runtime/jobmanager/SlotSharingITCase.scala  |   4 +-
 .../TaskManagerFailsWithSlotSharingITCase.scala |   4 +-
 .../flink/streaming/util/ClusterUtil.java       |   7 +-
 .../streaming/util/TestStreamEnvironment.java   |   9 +-
 .../flink/test/util/RecordAPITestBase.java      |   8 +-
 .../apache/flink/test/util/TestEnvironment.java |   9 +-
 .../test/cancelling/CancellingTestBase.java     |  50 ++--
 .../JobSubmissionFailsITCase.java               | 221 ++++++--------
 .../test/failingPrograms/TaskFailureITCase.java |   4 +-
 .../AbstractProcessFailureRecoveryTest.java     |   6 +-
 .../apache/flink/test/util/FailingTestBase.java |  30 +-
 .../jobmanager/JobManagerFailsITCase.scala      |   4 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |   8 +-
 28 files changed, 678 insertions(+), 604 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 22e17d0..d327d6f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -16,12 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client;
 
 import java.util.List;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
@@ -29,7 +27,6 @@ import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -180,12 +177,8 @@ public class LocalExecutor extends PlanExecutor {
 				
 				JobGraphGenerator jgg = new JobGraphGenerator();
 				JobGraph jobGraph = jgg.compileJobGraph(op);
-
-				ActorRef jobClient = flink.getJobClient();
-
-				SerializedJobExecutionResult result =
-						JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, jobClient, flink.timeout());
-
+				
+				SerializedJobExecutionResult result = flink.submitJobAndWait(jobGraph, printStatusDuringExecution);
 				return result.toJobExecutionResult(ClassLoader.getSystemClassLoader());
 			}
 			finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 80bdcb8..b4e5af1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -49,10 +49,10 @@ import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -316,27 +316,41 @@ public class Client {
 
 	public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
 		this.lastJobId = jobGraph.getJobID();
-		final String hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-		if (hostname == null) {
-			throw new ProgramInvocationException("Could not find hostname of job manager.");
-		}
-
-		FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
 
+		InetSocketAddress jobManagerAddress;
+		try {
+			jobManagerAddress = JobClient.getJobManagerAddress(configuration);
+		}
+		catch (IOException e) {
+			throw new ProgramInvocationException(e.getMessage(), e);
+		}
+		LOG.info("JobManager actor system address is " + jobManagerAddress);
+		
+		LOG.info("Starting client actor system");
 		final ActorSystem actorSystem;
-		final ActorRef client;
-
 		try {
-			Tuple2<ActorSystem, ActorRef> pair = JobClient.startActorSystemAndActor(configuration, false);
-			actorSystem = pair._1();
-			client = pair._2();
+			actorSystem = JobClient.startJobClientActorSystem(configuration);
 		}
 		catch (Exception e) {
-			throw new ProgramInvocationException("Could not build up connection to JobManager.", e);
+			throw new ProgramInvocationException("Could start client actor system.", e);
+		}
+
+		LOG.info("Looking up JobManager");
+		ActorRef jobManager;
+		try {
+			jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, configuration);
 		}
+		catch (IOException e) {
+			throw new ProgramInvocationException("Failed to resolve JobManager", e);
+		}
+		LOG.info("JobManager runs at " + jobManager.path());
+
+		FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
+		LOG.info("Communication between client and JobManager will have a timeout of " + timeout);
 
+		LOG.info("Checking and uploading JAR files");
 		try {
-			JobClient.uploadJarFiles(jobGraph, hostname, client, timeout);
+			JobClient.uploadJarFiles(jobGraph, jobManager, timeout);
 		}
 		catch (IOException e) {
 			throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
@@ -344,8 +358,8 @@ public class Client {
 
 		try{
 			if (wait) {
-				SerializedJobExecutionResult result =
-						JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, client, timeout);
+				SerializedJobExecutionResult result = JobClient.submitJobAndWait(actorSystem, 
+						jobManager, jobGraph, timeout, printStatusDuringExecution);
 				try {
 					return result.toJobExecutionResult(this.userCodeClassLoader);
 				}
@@ -355,8 +369,8 @@ public class Client {
 				}
 			}
 			else {
-				JobClient.submitJobDetached(jobGraph, client, timeout);
-				// return a "Fake" execution result with the JobId
+				JobClient.submitJobDetached(jobManager, jobGraph, timeout);
+				// return a dummy execution result with the JobId
 				return new JobSubmissionResult(jobGraph.getJobID());
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 22865ed..9a37dde 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -23,7 +23,6 @@ import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.UntypedActor;
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -38,6 +37,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.net.NetUtils;
 import org.junit.After;
 
@@ -224,7 +224,13 @@ public class ClientTest {
 
 		@Override
 		public void onReceive(Object message) throws Exception {
-			getSender().tell(new Status.Success(new JobID()), getSelf());
+			if (message instanceof JobManagerMessages.SubmitJob) {
+				JobID jid = ((JobManagerMessages.SubmitJob) message).jobGraph().getJobID();
+				getSender().tell(new Status.Success(jid), getSelf());
+			}
+			else {
+				getSender().tell(new Status.Failure(new Exception("Unknown message " + message)), getSelf());
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
new file mode 100644
index 0000000..aa03491
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobClientMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The JobClient bridges between the JobManager's asynchronous actor messages and
+ * the synchronous method calls to trigger.
+ */
+public class JobClient {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);
+
+
+	public static ActorSystem startJobClientActorSystem(Configuration config)
+			throws IOException {
+		LOG.info("Starting JobClient actor system");
+		Option<Tuple2<String, Object>> remoting =
+				new Some<Tuple2<String, Object>>(new Tuple2<String, Object>("", 0));
+
+		// start a remote actor system to listen on an arbitrary port
+		ActorSystem system = AkkaUtils.createActorSystem(config, remoting);
+		Address address = system.provider().getDefaultAddress();
+
+		String host = address.host().isDefined() ? address.host().get() : "(unknown)";
+		int port = address.port().isDefined() ? ((Integer) address.port().get()) : -1;
+		LOG.info("Started JobClient actor system at " + host + ':' + port);
+
+		return system;
+	}
+
+	/**
+	 * Extracts the JobManager's Akka URL from the configuration. If localActorSystem is true, then
+	 * the JobClient is executed in the same actor system as the JobManager. Thus, they can
+	 * communicate locally.
+	 *
+	 * @param config Configuration object containing all user provided configuration values
+	 * @return The socket address of the JobManager actor system
+	 */
+	public static InetSocketAddress getJobManagerAddress(Configuration config) throws IOException {
+
+		String jobManagerAddress = config.getString(
+				ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+
+		int jobManagerRPCPort = config.getInteger(
+				ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+				ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+
+		if (jobManagerAddress == null) {
+			throw new RuntimeException(
+					"JobManager address has not been specified in the configuration.");
+		}
+
+		try {
+			return new InetSocketAddress(
+					InetAddress.getByName(jobManagerAddress), jobManagerRPCPort);
+		}
+		catch (UnknownHostException e) {
+			throw new IOException("Cannot resolve JobManager hostname " + jobManagerAddress, e);
+		}
+	}
+
+	/**
+	 * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to
+	 * the JobManager. The method blocks until the job has finished or the JobManager is no longer
+	 * alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter
+	 * case a [[JobExecutionException]] is thrown.
+	 *
+	 * @param actorSystem The actor system that performs the communication.
+	 * @param jobManager  The JobManager that should execute the job.
+	 * @param jobGraph    JobGraph describing the Flink job
+	 * @param timeout     Timeout for futures
+	 * @return The job execution result
+	 * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
+	 *                                                               execution fails.
+	 */
+	public static SerializedJobExecutionResult submitJobAndWait(ActorSystem actorSystem, ActorRef jobManager,
+																JobGraph jobGraph, FiniteDuration timeout,
+																boolean sysoutLogUpdates) throws JobExecutionException
+	{
+		if (actorSystem == null || jobManager == null || jobGraph == null || timeout == null) {
+			throw new NullPointerException();
+		}
+		// for this job, we create a proxy JobClientActor that deals with all communication with
+		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
+		// update messages, watches for disconnect between client and JobManager, ...
+
+		Props jobClientActorProps = Props.create(JobClientActor.class, jobManager, LOG, sysoutLogUpdates);
+		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
+
+		try {
+			Future<Object> future = Patterns.ask(jobClientActor,
+					new JobClientMessages.SubmitJobAndWait(jobGraph),
+					new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+			Object answer = Await.result(future, timeout);
+
+			if (answer instanceof JobManagerMessages.JobResultSuccess) {
+				LOG.info("Job execution complete");
+
+				SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
+				if (result != null) {
+					return result;
+				} else {
+					throw new Exception("Job was successfully executed but result contained a null JobExecutionResult.");
+				}
+			} else if (answer instanceof Status.Failure) {
+				throw ((Status.Failure) answer).cause();
+			} else {
+				throw new Exception("Unknown answer after submitting the job: " + answer);
+			}
+		}
+		catch (JobExecutionException e) {
+			throw e;
+		}
+		catch (TimeoutException e) {
+			throw new JobTimeoutException(jobGraph.getJobID(), "Lost connection to JobManager", e);
+		}
+		catch (Throwable t) {
+			throw new JobExecutionException(jobGraph.getJobID(),
+					"Communication with JobManager failed: " + t.getMessage(), t);
+		}
+		finally {
+			// failsafe shutdown of the client actor
+			jobClientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+		}
+	}
+
+	/**
+	 * Submits a job in detached mode. The method sends the JobGraph to the
+	 * JobManager and waits for the answer whether teh job could be started or not.
+	 *
+	 * @param jobGraph The job
+	 * @param timeout  Timeout in which the JobManager must have responded.
+	 */
+	public static void submitJobDetached(ActorRef jobManager, JobGraph jobGraph, FiniteDuration timeout)
+			throws JobExecutionException {
+		if (jobManager == null || jobGraph == null || timeout == null) {
+			throw new NullPointerException();
+		}
+
+		Future<Object> future = Patterns.ask(jobManager,
+				new JobManagerMessages.SubmitJob(jobGraph, false),
+				new Timeout(timeout));
+		try {
+			Object result = Await.result(future, timeout);
+			if (result instanceof JobID) {
+				JobID respondedID = (JobID) result;
+				if (!respondedID.equals(jobGraph.getJobID())) {
+					throw new Exception("JobManager responded for wrong Job. This Job: " + jobGraph.getJobID() +
+							", response: " + respondedID);
+				}
+			}
+			else {
+				throw new Exception("Unexpected response: " + result);
+			}
+		}
+		catch (JobExecutionException e) {
+			throw e;
+		}
+		catch (TimeoutException e) {
+			throw new JobTimeoutException(jobGraph.getJobID(),
+					"JobManager did not respond within " + timeout.toString(), e);
+		}
+		catch (Throwable t) {
+			throw new JobExecutionException(jobGraph.getJobID(),
+					"Failed to send job to JobManager: " + t.getMessage(), t.getCause());
+		}
+	}
+
+	/**
+	 * Uploads the specified jar files of the [[JobGraph]] jobGraph to the BlobServer of the
+	 * JobManager. The respective port is retrieved from the JobManager. This function issues a
+	 * blocking call.
+	 *
+	 * @param jobGraph   Flink job containing the information about the required jars
+	 * @param jobManager ActorRef of the JobManager.
+	 * @param timeout    Timeout for futures
+	 * @throws IOException Thrown, if the file upload to the JobManager failed.
+	 */
+	public static void uploadJarFiles(JobGraph jobGraph, ActorRef jobManager, FiniteDuration timeout)
+			throws IOException {
+		if (jobGraph.hasUsercodeJarFiles()) {
+			Timeout tOut = new Timeout(timeout);
+			Future<Object> futureBlobPort = Patterns.ask(jobManager,
+					JobManagerMessages.getRequestBlobManagerPort(),
+					tOut);
+
+			int port;
+			try {
+				Object result = Await.result(futureBlobPort, timeout);
+				if (result instanceof Integer) {
+					port = (Integer) result;
+				} else {
+					throw new Exception("Expected port number (int) as answer, received " + result);
+				}
+			}
+			catch (Exception e) {
+				throw new IOException("Could not retrieve the JobManager's blob port.", e);
+			}
+
+			Option<String> jmHost = jobManager.path().address().host();
+			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
+			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, port);
+
+			jobGraph.uploadRequiredJarFiles(serverAddress);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
new file mode 100644
index 0000000..ee31e8d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.actor.Status;
+import akka.actor.Terminated;
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+import org.apache.flink.runtime.messages.JobClientMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.slf4j.Logger;
+
+/**
+ * Actor which constitutes the bridge between the non-actor code and the JobManager. The JobClient
+ * is used to submit jobs to the JobManager and to request the port of the BlobManager.
+ */
+public class JobClientActor extends UntypedActor {
+	
+	private final ActorRef jobManager;
+	private final Logger logger;
+	private final boolean sysoutUpdates;
+	
+	private ActorRef submitter;
+	
+	
+	public JobClientActor(ActorRef jobManager, Logger logger, boolean sysoutUpdates) {
+		if (jobManager == null || logger == null) {
+			throw new NullPointerException();
+		}
+		this.jobManager = jobManager;
+		this.logger = logger;
+		this.sysoutUpdates = sysoutUpdates;
+	}
+	
+	@Override
+	public void onReceive(Object message) {
+		
+		// =========== State Change Messages ===============
+
+		if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
+			logAndPrintMessage(message);
+		}
+		else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
+			logAndPrintMessage(message);
+		}
+
+		// =========== Job Life Cycle Messages ===============
+		
+		// submit a job to the JobManager
+		else if (message instanceof JobClientMessages.SubmitJobAndWait) {
+			// sanity check that this no job was submitted through this actor before -
+			// it is a one-shot actor after all
+			if (this.submitter == null) {
+				JobGraph jobGraph = ((JobClientMessages.SubmitJobAndWait) message).jobGraph();
+				if (jobGraph == null) {
+					logger.error("Received null JobGraph");
+					sender().tell(new Status.Failure(new Exception("JobGraph is null")), getSelf());
+				}
+				else {
+					logger.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress",
+							jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID());
+
+					this.submitter = getSender();
+					jobManager.tell(new JobManagerMessages.SubmitJob(jobGraph, true), getSelf());
+					
+					// make sure we notify the sender when the connection got lost
+					getContext().watch(jobManager);
+				}
+			}
+			else {
+				// repeated submission - tell failure to sender and kill self
+				String msg = "Received repeated 'SubmitJobAndWait'";
+				logger.error(msg);
+				getSender().tell(new Status.Failure(new Exception(msg)), ActorRef.noSender());
+
+				getContext().unwatch(jobManager);
+				getSelf().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+		}
+		// acknowledgement to submit job is only logged, our original
+		// submitter is only interested in the final job result
+		else if (message instanceof JobManagerMessages.JobResultSuccess) {
+			// forward the success to the original job submitter
+			logger.debug("Received JobResultSuccess message from JobManager");
+			if (this.submitter != null) {
+				this.submitter.tell(message, getSelf());
+			}
+			
+			// we are done, stop ourselves
+			getContext().unwatch(jobManager);
+			getSelf().tell(PoisonPill.getInstance(), ActorRef.noSender());
+		}
+		else if (message instanceof Status.Success) {
+			// job was successfully submitted :-)
+			logger.info("Job was successfully submitted to the JobManager");
+		}
+		else if (message instanceof Status.Failure) {
+			// job execution failed, inform the actor that submitted the job
+			logger.debug("Received failure from JobManager", ((Status.Failure) message).cause());
+			if (submitter != null) {
+				submitter.tell(message, sender());
+			}
+		}
+
+		// =========== Actor / Communication Failure ===============
+		
+		else if (message instanceof Terminated) {
+			ActorRef target = ((Terminated) message).getActor();
+			if (jobManager.equals(target)) {
+				String msg = "Lost connection to JobManager " + jobManager.path();
+				logger.info(msg);
+				submitter.tell(new Status.Failure(new Exception(msg)), getSelf());
+			} else {
+				logger.error("Received 'Terminated' for unknown actor " + target);
+			}
+		}
+
+		// =========== Unknown Messages ===============
+		
+		else {
+			logger.error("JobClient received unknown message: " + message);
+		}
+	}
+	
+	private void logAndPrintMessage(Object message) {
+		logger.info(message.toString());
+		if (sysoutUpdates) {
+			System.out.println(message.toString());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
deleted file mode 100644
index 013fe4c..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.client
-
-import java.io.IOException
-import java.net.{InetAddress, InetSocketAddress}
-
-import akka.actor.Status.{Success, Failure}
-import akka.actor._
-import akka.pattern.{Patterns, ask}
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobgraph.JobGraph
-import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, SubmitJobAndWait}
-import org.apache.flink.runtime.messages.JobManagerMessages._
-
-import scala.concurrent.{TimeoutException, Await}
-import scala.concurrent.duration.FiniteDuration
-
-/**
- * Actor which constitutes the bridge between the non-actor code and the JobManager. The JobClient
- * is used to submit jobs to the JobManager and to request the port of the BlobManager.
- *
- * @param jobManager ActorRef to JobManager
- */
-class JobClient(jobManager: ActorRef) extends
-Actor with ActorLogMessages with ActorLogging {
-
-  override def receiveWithLogMessages: Receive = {
-
-    case SubmitJobDetached(jobGraph) =>
-      jobManager forward SubmitJob(jobGraph, registerForEvents = false)
-
-    case cancelJob: CancelJob =>
-      jobManager forward cancelJob
-
-    case SubmitJobAndWait(jobGraph, listen) =>
-      val listener = context.actorOf(Props(classOf[JobClientListener], sender()))
-      jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen), listener)
-
-    case RequestBlobManagerPort =>
-      jobManager forward RequestBlobManagerPort
-
-    case RequestJobManagerStatus =>
-      jobManager forward RequestJobManagerStatus
-  }
-
-  /**
-   * Handle unmatched messages with an exception.
-   */
-  override def unhandled(message: Any): Unit = {
-    // let the actor crash
-    throw new RuntimeException("Received unknown message " + message)
-  }
-}
-
-/**
- * Helper actor which listens to status messages from the JobManager and prints them on the
- * standard output. Such an actor is started for each job, which is configured to listen to these
- * status messages.
- *
- * @param jobSubmitter Akka URL of the sender of the job
- */
-class JobClientListener(jobSubmitter: ActorRef) extends Actor with ActorLogMessages with
-ActorLogging {
-  override def receiveWithLogMessages: Receive = {
-    case failure: Failure =>
-      jobSubmitter ! failure
-      self ! PoisonPill
-
-    case Success(_) =>
-
-    case JobResultSuccess(result) =>
-      jobSubmitter ! result
-      self ! PoisonPill
-
-    case msg =>
-      // we have to use System.out.println here to avoid erroneous behavior for output redirection
-      System.out.println(msg.toString)
-  }
-
-  /**
-   * Handle unmatched messages with an exception.
-   */
-  override def unhandled(message: Any): Unit = {
-    // let the actor crash
-    throw new RuntimeException("Received unknown message " + message)
-  }
-}
-
-/**
- * JobClient's companion object containing convenience functions to start a JobClient actor, parse
- * the configuration to extract the JobClient's settings and convenience functions to submit jobs.
- */
-object JobClient {
-
-  val JOB_CLIENT_NAME = "jobclient"
-
-  @throws(classOf[IOException])
-  def startActorSystemAndActor(config: Configuration,
-                               localActorSystem: Boolean): (ActorSystem, ActorRef) = {
-
-    // start a remote actor system to listen on an arbitrary port
-    val actorSystem = AkkaUtils.createActorSystem(configuration = config,
-                                                  listeningAddress = Some(("", 0)))
-    try {
-      val jobClientActor = createJobClientFromConfig(config, localActorSystem, actorSystem)
-      (actorSystem, jobClientActor)
-    }
-    catch {
-      case t: Throwable => {
-        actorSystem.shutdown()
-        throw t
-      }
-    }
-  }
-
-  @throws(classOf[IOException])
-  def createJobClientFromConfig(config: Configuration,
-                                localActorSystem: Boolean,
-                                actorSystem: ActorSystem): ActorRef = {
-
-    val jobManagerAddress = getJobManagerUrlFromConfig(config, localActorSystem)
-    createJobClient(jobManagerAddress, actorSystem, config)
-  }
-
-  @throws(classOf[IOException])
-  def createJobClient(jobManagerURL: String,
-                      actorSystem: ActorSystem,
-                      config: Configuration): ActorRef = {
-
-    val timeout = AkkaUtils.getLookupTimeout(config)
-    val jobManager = JobManager.getJobManagerRemoteReference(jobManagerURL, actorSystem, timeout)
-
-    actorSystem.actorOf(Props(classOf[JobClient], jobManager), JOB_CLIENT_NAME)
-  }
-
-
-  /**
-   * Extracts the JobManager's Akka URL from the configuration. If localActorSystem is true, then
-   * the JobClient is executed in the same actor system as the JobManager. Thus, they can
-   * communicate locally.
-   *
-   * @param configuration Configuration object containing all user provided configuration values
-   * @param localActorSystem  true if the JobClient runs in the same actor system as the JobManager,
-   *                          otherwise false
-   * @return Akka URL of the JobManager
-   */
-  def getJobManagerUrlFromConfig(configuration: Configuration,
-                                 localActorSystem: Boolean): String = {
-    if (localActorSystem) {
-      // JobManager and JobClient run in the same ActorSystem
-      JobManager.getLocalJobManagerAkkaURL
-    } else {
-      val jobManagerAddress = configuration.getString(
-        ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-
-      val jobManagerRPCPort = configuration.getInteger(
-        ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-        ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
-      if (jobManagerAddress == null) {
-        throw new RuntimeException(
-          "JobManager address has not been specified in the configuration.")
-      }
-
-      val hostPort = new InetSocketAddress(InetAddress.getByName(jobManagerAddress),
-                                           jobManagerRPCPort)
-      JobManager.getRemoteJobManagerAkkaURL(hostPort)
-    }
-  }
-
-  /**
-   * Sends a [[JobGraph]] to the JobClient actor specified by jobClient which submits it then to
-   * the JobManager. The method blocks until the job has finished or the JobManager is no longer
-   * alive. In the former case, the [[SerializedJobExecutionResult]] is returned and in the latter
-   * case a [[JobExecutionException]] is thrown.
-   *
-   * @param jobGraph JobGraph describing the Flink job
-   * @param listenToStatusEvents true if the JobClient shall print status events of the
-   *                             corresponding job, otherwise false
-   * @param jobClient ActorRef to the JobClient
-   * @param timeout Timeout for futures
-   * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
-   *                                                               execution fails.
-   * @return The job execution result
-   */
-  @throws(classOf[JobExecutionException])
-  def submitJobAndWait(jobGraph: JobGraph,
-                       listenToStatusEvents: Boolean,
-                       jobClient: ActorRef,
-                       timeout: FiniteDuration): SerializedJobExecutionResult = {
-
-    var waitForAnswer = true
-    var answer: SerializedJobExecutionResult = null
-
-    val result = (jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = listenToStatusEvents))(
-      AkkaUtils.INF_TIMEOUT).mapTo[SerializedJobExecutionResult]
-
-    while (waitForAnswer) {
-      try {
-        answer = Await.result(result, timeout)
-        waitForAnswer = false
-      } catch {
-        case x: TimeoutException =>
-          val jmStatus = (jobClient ? RequestJobManagerStatus)(timeout).mapTo[JobManagerStatus]
-
-          try {
-            Await.result(jmStatus, timeout)
-          } catch {
-            case t: Throwable =>
-              throw new JobTimeoutException(jobGraph.getJobID, "Lost connection to " +
-                "job manager.", t)
-          }
-      }
-    }
-
-    answer
-  }
-
-  /**
-   * Submits a job in detached mode. The method sends the corresponding [[JobGraph]] to the
-   * JobClient specified by jobClient. The JobClient does not start a [[JobClientListener]] and
-   * simply returns a possible failure on the [[JobManager]].
-   *
-   * @param jobGraph Flink job
-   * @param jobClient ActorRef to the JobClient
-   * @param timeout Timeout for futures
-   * @return The submission response
-   */
-  @throws(classOf[JobExecutionException])
-  def submitJobDetached(jobGraph: JobGraph,
-                        jobClient: ActorRef,
-                        timeout: FiniteDuration): Unit = {
-
-    val response = (jobClient ? SubmitJobDetached(jobGraph))(timeout)
-
-    try {
-      Await.result(response, timeout)
-    } catch {
-      case timeout: TimeoutException =>
-        throw new JobTimeoutException(jobGraph.getJobID,
-          "Timeout while submitting the job to the JobManager.", timeout);
-    }
-  }
-
-  /**
-   * Uploads the specified jar files of the [[JobGraph]] jobGraph to the BlobServer of the
-   * JobManager. The respective port is retrieved from the JobManager. This function issues a
-   * blocking call.
-   *
-   * @param jobGraph Flink job containing the information about the required jars
-   * @param hostname Hostname of the instance on which the BlobServer and also the JobManager run
-   * @param jobClient ActorRef to the JobClient
-   * @param timeout Timeout for futures
-   * @throws IOException Thrown, if the file upload to the JobManager failed.
-   */
-  @throws(classOf[IOException])
-  def uploadJarFiles(jobGraph: JobGraph, hostname: String, jobClient: ActorRef)(
-    implicit timeout: FiniteDuration): Unit = {
-
-    if (jobGraph.hasUsercodeJarFiles()) {
-      val futureBlobPort = Patterns.ask(jobClient, RequestBlobManagerPort, timeout).mapTo[Int]
-
-      val port = try {
-        Await.result(futureBlobPort, timeout)
-      } catch {
-        case e: Exception => throw new IOException("Could not retrieve the server's blob port.", e)
-      }
-
-      val serverAddress = new InetSocketAddress(hostname, port)
-
-      jobGraph.uploadRequiredJarFiles(serverAddress)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
index 425d7f8..e0dce35 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
@@ -26,26 +26,24 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 object JobClientMessages {
 
   /**
-   * This message submits a jobGraph to the JobClient which sends it to the JobManager. The
-   * JobClient waits until the job has been executed. If listenToEvents is true,
-   * then the JobClient prints all state change messages to the console. The
-   * JobClient sends the result of the execution back to the sender. If the execution is
-   * successful then a [[org.apache.flink.runtime.messages.JobManagerMessages.JobResult]] is sent
-   * back. If a [[org.apache.flink.runtime.messages.JobManagerMessages.SubmissionFailure]]
-   * happens, then the cause is sent back to the sender().
+   * This message is sent to the JobClient (via ask) to submit a job and
+   * get a response when the job execution has finished.
+   * 
+   * The response to this message is a
+   * [[org.apache.flink.runtime.client.SerializedJobExecutionResult]]
    *
-   * @param jobGraph containing the job description
-   * @param listenToEvents if true then print state change messages
+   * @param jobGraph The job to be executed.
    */
-  case class SubmitJobAndWait(jobGraph: JobGraph, listenToEvents: Boolean = false)
+  case class SubmitJobAndWait(jobGraph: JobGraph)
 
   /**
-   * This message submits a jobGraph to the JobClient which sends it to the JobManager. The
-   * JobClient awaits the
-   * [[org.apache.flink.runtime.messages.JobManagerMessages.SubmissionResponse]]
-   * from the JobManager and sends it back to the sender().
+   * This message is sent to the JobClient (via ask) to submit a job and 
+   * return as soon as the result of the submit operation is known. 
    *
-   * @param jobGraph containing the job description
+   * The response to this message is a
+   * [[org.apache.flink.api.common.JobSubmissionResult]]
+   *
+   * @param jobGraph The job to be executed.
    */
   case class SubmitJobDetached(jobGraph: JobGraph)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 5b70294..03e837d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -37,10 +37,10 @@ object JobManagerMessages {
    * then the sender will be registered as listener for the state change messages.
    * The submission result will be sent back to the sender as a success message.
    *
-   * @param jobGraph
+   * @param jobGraph The job to be submitted to the JobManager
    * @param registerForEvents if true, then register for state change events
    */
-  case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean = false)
+  case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean)
 
   /**
    * Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 0e29345..8a6c394 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -24,11 +24,15 @@ import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
+import org.apache.flink.api.common.JobSubmissionResult
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.client.{JobExecutionException, JobClient, SerializedJobExecutionResult}
+import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.slf4j.LoggerFactory
 
+import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Future, Await}
 
 /**
@@ -162,10 +166,8 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
   def awaitTermination(): Unit = {
     jobManagerActorSystem.awaitTermination()
 
-    if(!singleActorSystem) {
-      taskManagerActorSystems foreach {
-        _.awaitTermination()
-      }
+    taskManagerActorSystems foreach {
+      _.awaitTermination()
     }
   }
 
@@ -178,4 +180,27 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration,
 
     Await.ready(Future.sequence(futures), timeout)
   }
+
+  @throws(classOf[JobExecutionException])
+  def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean)
+                                                                : SerializedJobExecutionResult = {
+
+    submitJobAndWait(jobGraph, printUpdates, timeout)
+  }
+  
+  @throws(classOf[JobExecutionException])
+  def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean, timeout: FiniteDuration)
+                                                                 : SerializedJobExecutionResult = {
+
+    val clientActorSystem = if (singleActorSystem) jobManagerActorSystem
+    else JobClient.startJobClientActorSystem(configuration)
+
+    JobClient.submitJobAndWait(clientActorSystem, jobManagerActor, jobGraph, timeout, printUpdates)
+  }
+
+  @throws(classOf[JobExecutionException])
+  def submitJobDetached(jobGraph: JobGraph) : JobSubmissionResult = {
+    JobClient.submitJobDetached(jobManagerActor, jobGraph, timeout)
+    new JobSubmissionResult(jobGraph.getJobID)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 13e1ccd..d7cd6e9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -48,11 +48,9 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
     jobManagerActorSystem
   } else {
     // create an actor system listening on a random port
-    AkkaUtils.createDefaultActorSystem()
+    JobClient.startJobClientActorSystem(configuration)
   }
 
-  var jobClient: Option[ActorRef] = None
-
 
   override def generateConfiguration(userConfiguration: Configuration): Configuration = {
     val config = getDefaultConfig
@@ -114,22 +112,6 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
                                                    classOf[TaskManager])
   }
 
-  def getJobClient(): ActorRef = {
-    jobClient match {
-      case Some(jc) => jc
-      case None =>
-        val config = new Configuration()
-
-        config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
-        config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
-
-        val jc = JobClient.createJobClientFromConfig(config, singleActorSystem,
-          jobClientActorSystem)
-        jobClient = Some(jc)
-        jc
-    }
-  }
-
   def getJobClientActorSystem: ActorSystem = jobClientActorSystem
 
   def getJobManagerRPCPort: Int = {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index f984ca9..b404aae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import akka.actor.ActorRef;
-import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -45,7 +43,6 @@ public class SlotCountExceedingParallelismTest {
 	private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
 
 	private static TestingCluster flink;
-	private static ActorRef jobClient;
 
 	@BeforeClass
 	public static void setUp() throws Exception {
@@ -53,11 +50,6 @@ public class SlotCountExceedingParallelismTest {
 				NUMBER_OF_SLOTS_PER_TM,
 				NUMBER_OF_TMS,
 				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-
-		jobClient = JobClient.createJobClientFromConfig(
-				flink.configuration(),
-				true,
-				flink.jobManagerActorSystem());
 	}
 
 	@AfterClass
@@ -85,11 +77,7 @@ public class SlotCountExceedingParallelismTest {
 	// ---------------------------------------------------------------------------------------------
 
 	private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException {
-		JobClient.submitJobAndWait(
-				jobGraph,
-				false,
-				jobClient,
-				TestingUtils.TESTING_DURATION());
+		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
 	}
 
 	private JobGraph createTestJobGraph(

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index ade14a1..08f8bfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import akka.actor.ActorRef;
 import com.google.common.collect.Lists;
-import org.apache.flink.runtime.client.JobClient;
+
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.types.IntValue;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -46,7 +46,6 @@ public class ScheduleOrUpdateConsumersTest {
 	private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
 
 	private static TestingCluster flink;
-	private static ActorRef jobClient;
 
 	@BeforeClass
 	public static void setUp() throws Exception {
@@ -54,11 +53,6 @@ public class ScheduleOrUpdateConsumersTest {
 				NUMBER_OF_SLOTS_PER_TM,
 				NUMBER_OF_TMS,
 				TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-
-		jobClient = JobClient.createJobClientFromConfig(
-				flink.configuration(),
-				true,
-				flink.jobManagerActorSystem());
 	}
 
 	@AfterClass
@@ -122,7 +116,7 @@ public class ScheduleOrUpdateConsumersTest {
 				pipelinedReceiver,
 				blockingReceiver);
 
-		JobClient.submitJobAndWait(jobGraph, false, jobClient, TestingUtils.TESTING_DURATION());
+		flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
 	}
 
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
index 28be994..07f0ce5 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.scala
@@ -69,7 +69,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with WrapA
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
 
           expectMsgType[JobResultSuccess]

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 0fa3d5a..ee584f0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -71,7 +71,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         availableSlots should equal(1)
 
         within(2 second) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
 
           val success = expectMsgType[Success]
 
@@ -116,7 +116,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         availableSlots should equal(num_tasks)
 
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
 
           expectMsg(Success(jobGraph.getJobID))
           val result = expectMsgType[JobResultSuccess]
@@ -146,7 +146,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
 
           expectMsg(Success(jobGraph.getJobID))
 
@@ -181,7 +181,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
 
           expectMsg(Success(jobGraph.getJobID))
 
@@ -216,7 +216,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
 
           expectMsg(Success(jobGraph.getJobID))
 
@@ -253,7 +253,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
 
           expectMsg(Success(jobGraph.getJobID))
           val failure = expectMsgType[Failure]
@@ -297,7 +297,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
 
           expectMsgType[JobResultSuccess]
@@ -341,7 +341,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
 
           expectMsg(Success(jobGraph.getJobID))
 
@@ -380,7 +380,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         }
 
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
 
           val failure = expectMsgType[Failure]
@@ -428,7 +428,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
         }
 
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
           val failure = expectMsgType[Failure]
 
@@ -467,7 +467,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
           val failure = expectMsgType[Failure]
 
@@ -509,7 +509,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
           jm ! RequestTotalNumberOfSlots
           expectMsg(num_tasks)
 
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
           val failure = expectMsgType[Failure]
 
@@ -556,7 +556,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
           jm ! RequestTotalNumberOfSlots
           expectMsg(num_tasks)
 
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
           val failure = expectMsgType[Failure]
 
@@ -595,7 +595,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try{
         within(TestingUtils.TESTING_DURATION){
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
 
           expectMsg(Success(jobGraph.getJobID))
           expectMsgType[JobResultSuccess]

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 53bc70c..dfc650e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -78,7 +78,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION){
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
 
           expectMsg(Success(jobGraph.getJobID))
 
@@ -121,7 +121,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION){
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
 
           expectMsg(Success(jobGraph.getJobID))
 
@@ -165,7 +165,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION){
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
 
           expectMsg(Success(jobGraph.getJobID))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
index d719dc3..faff2f2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/SlotSharingITCase.scala
@@ -65,7 +65,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
           expectMsgType[JobResultSuccess]
 
@@ -108,7 +108,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       val jm = cluster.getJobManager
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
           expectMsgType[JobResultSuccess]
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index dcfc899..39543f7 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -69,7 +69,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try{
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
 
           jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
@@ -118,7 +118,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try{
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
 
           jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 85563e9..64b7bd8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -20,15 +20,12 @@ package org.apache.flink.streaming.util;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import akka.actor.ActorRef;
-
 public class ClusterUtil {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class);
@@ -60,10 +57,8 @@ public class ClusterUtil {
 
 		try {
 			exec = new LocalFlinkMiniCluster(configuration, true);
-			ActorRef jobClient = exec.getJobClient();
 
-			SerializedJobExecutionResult result =
-					JobClient.submitJobAndWait(jobGraph, true, jobClient, exec.timeout());
+			SerializedJobExecutionResult result = exec.submitJobAndWait(jobGraph, true);
 			return result.toJobExecutionResult(ClusterUtil.class.getClassLoader());
 		}
 		finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 4dac980..0ff5c56 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -70,15 +70,12 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 			executor = new ForkableFlinkMiniCluster(configuration);
 		}
 		try {
-			ActorRef client = executor.getJobClient();
-
-			SerializedJobExecutionResult result =
-					JobClient.submitJobAndWait(jobGraph, false, client, executor.timeout());
-
+			
+			SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
 			latestResult = result.toJobExecutionResult(getClass().getClassLoader());
 			return latestResult;
 		}
-		catch(JobExecutionException e) {
+		catch (JobExecutionException e) {
 			if (e.getMessage().contains("GraphConversionException")) {
 				throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index 6acd5b0..9c6062e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.test.util;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.DataStatistics;
@@ -28,9 +26,9 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -121,9 +119,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
 			Assert.assertNotNull("Obtained null JobGraph", jobGraph);
 
 			try {
-				ActorRef client = this.executor.getJobClient();
-				SerializedJobExecutionResult result =
-						JobClient.submitJobAndWait(jobGraph, false, client, executor.timeout());
+				SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
 				this.jobExecutionResult = result.toJobExecutionResult(getClass().getClassLoader());
 			}
 			catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index eaf9854..cf1caeb 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.util;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -28,9 +27,9 @@ import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+
 import org.junit.Assert;
 
 public class TestEnvironment extends ExecutionEnvironment {
@@ -52,10 +51,8 @@ public class TestEnvironment extends ExecutionEnvironment {
 
 			JobGraphGenerator jgg = new JobGraphGenerator();
 			JobGraph jobGraph = jgg.compileJobGraph(op);
-
-			ActorRef client = this.executor.getJobClient();
-			SerializedJobExecutionResult result =
-					JobClient.submitJobAndWait(jobGraph, false, client, executor.timeout());
+			
+			SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
 
 			this.latestResult = result.toJobExecutionResult(getClass().getClassLoader());
 			return this.latestResult;

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 345bffd..a3186a7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -19,17 +19,12 @@
 
 package org.apache.flink.test.cancelling;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobCancellationException;
-import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.junit.Assert;
@@ -44,11 +39,9 @@ import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
+
 import org.junit.After;
 import org.junit.Before;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 /**
  * 
@@ -106,33 +99,46 @@ public abstract class CancellingTestBase {
 		runAndCancelJob(plan, msecsTillCanceling, DEFAULT_CANCEL_FINISHED_INTERVAL);
 	}
 		
-	public void runAndCancelJob(Plan plan, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
+	public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
 		try {
 			// submit job
 			final JobGraph jobGraph = getJobGraph(plan);
-			final ActorRef client = this.executor.getJobClient();
-			final ActorSystem actorSystem = executor.getJobClientActorSystem();
+			
+			final Thread currentThread = Thread.currentThread();
+			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+			
 			boolean jobSuccessfullyCancelled = false;
 
-			Future<Object> result = Patterns.ask(client, new JobClientMessages.SubmitJobAndWait
-					(jobGraph, false), new Timeout(AkkaUtils.getDefaultTimeout()));
-
-			actorSystem.scheduler().scheduleOnce(new FiniteDuration(msecsTillCanceling,
-							TimeUnit.MILLISECONDS), client, new JobManagerMessages.CancelJob(jobGraph.getJobID()),
-					actorSystem.dispatcher(), ActorRef.noSender());
+			// trigger the cancelling asynchronous
+			new Thread() {
+				@Override
+				public void run() {
+					try {
+						Thread.sleep(msecsTillCanceling);
+						executor.getJobManager().tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()), ActorRef.noSender());
+					}
+					catch (Throwable t) {
+						error.set(t);
+						currentThread.interrupt();
+					}
+				}
+			}.run();
 
 			try {
-				Await.result(result, AkkaUtils.getDefaultTimeout());
-			} catch (JobCancellationException exception) {
+				executor.submitJobAndWait(jobGraph, false);
+			}
+			catch (JobCancellationException exception) {
 				jobSuccessfullyCancelled = true;
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				throw new IllegalStateException("Job failed.", e);
 			}
 
 			if (!jobSuccessfullyCancelled) {
 				throw new IllegalStateException("Job was not successfully cancelled.");
 			}
-		}catch(Exception e){
+		}
+		catch(Exception e) {
 			LOG.error("Exception found in runAndCancelJob.", e);
 			Assert.fail(StringUtils.stringifyException(e));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index a54bd97..ca1c304 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -18,12 +18,9 @@
 
 package org.apache.flink.test.failingPrograms;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
@@ -32,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -47,26 +45,44 @@ import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
 public class JobSubmissionFailsITCase {
-
-	private static ActorSystem system;
-
+	
+	private static final int NUM_SLOTS = 20;
+	
+	private static ForkableFlinkMiniCluster cluser;
 	private static JobGraph workingJobGraph;
 
 	@BeforeClass
 	public static void setup() {
-		system = ActorSystem.create("TestingActorSystem", AkkaUtils.getDefaultAkkaConfig());
-
-		final AbstractJobVertex jobVertex = new AbstractJobVertex("Working job vertex.");
-		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-
-		workingJobGraph = new JobGraph("Working testing job", jobVertex);
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
+			
+			cluser = new ForkableFlinkMiniCluster(config);
+			
+			final AbstractJobVertex jobVertex = new AbstractJobVertex("Working job vertex.");
+			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			workingJobGraph = new JobGraph("Working testing job", jobVertex);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
 	@AfterClass
 	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-		system = null;
+		try {
+			cluser.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
+	
+	// --------------------------------------------------------------------------------------------
 
 	private boolean detached;
 
@@ -80,148 +96,101 @@ public class JobSubmissionFailsITCase {
 				new Boolean[]{true});
 	}
 
-	private JobExecutionResult submitJob(JobGraph jobGraph, ActorRef jobClient) throws Exception {
+	// --------------------------------------------------------------------------------------------
+	
+	private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception {
 		if (detached) {
-			JobClient.submitJobDetached(jobGraph, jobClient, TestingUtils.TESTING_DURATION());
+			cluser.submitJobDetached(jobGraph);
 			return null;
 		}
 		else {
-			SerializedJobExecutionResult result =
-					JobClient.submitJobAndWait(jobGraph, false, jobClient, TestingUtils.TESTING_DURATION());
+			SerializedJobExecutionResult result = cluser.submitJobAndWait(
+												jobGraph, false, TestingUtils.TESTING_DURATION());
 			return result.toJobExecutionResult(getClass().getClassLoader());
 		}
 	}
 
 	@Test
 	public void testExceptionInInitializeOnMaster() {
-		new JavaTestKit(system) {{
-			final int numSlots = 20;
-
-			final ForkableFlinkMiniCluster cluster =
-					ForkableFlinkMiniCluster.startCluster(numSlots/2, 2,
-							TestingUtils.TESTING_DURATION().toString());
-
-			final ActorRef jobClient = cluster.getJobClient();
-
+		try {
 			final AbstractJobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
 			failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
 
 			final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
 
 			try {
-				new Within(TestingUtils.TESTING_DURATION()) {
-
-					@Override
-					protected void run() {
-						try {
-							submitJob(failingJobGraph, jobClient);
-							fail("Expected JobExecutionException.");
-						} catch (JobExecutionException e) {
-							assertEquals("Test exception.", e.getCause().getMessage());
-						} catch (Throwable t) {
-							fail("Caught wrong exception of type " + t.getClass() + ".");
-							t.printStackTrace();
-						}
-
-						try {
-							JobClient.submitJobAndWait(workingJobGraph, false, jobClient,
-									TestingUtils.TESTING_DURATION());
-						} catch (Throwable t) {
-							fail("Caught unexpected exception " + t.getMessage() + ".");
-						}
-					}
-				};
-			} finally {
-				cluster.stop();
+				submitJob(failingJobGraph);
+				fail("Expected JobExecutionException.");
+			}
+			catch (JobExecutionException e) {
+				assertEquals("Test exception.", e.getCause().getMessage());
+			}
+			catch (Throwable t) {
+				t.printStackTrace();
+				fail("Caught wrong exception of type " + t.getClass() + ".");
 			}
-		}};
+
+			cluser.submitJobAndWait(workingJobGraph, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
 	@Test
 	public void testSubmitEmptyJobGraph() {
-		new JavaTestKit(system) {{
-			final int numSlots = 20;
-
-			final ForkableFlinkMiniCluster cluster =
-					ForkableFlinkMiniCluster.startCluster(numSlots/2, 2,
-							TestingUtils.TESTING_DURATION().toString());
-
-			final ActorRef jobClient = cluster.getJobClient();
-
+		try {
 			final JobGraph jobGraph = new JobGraph("Testing job");
-
+	
 			try {
-				new Within(TestingUtils.TESTING_DURATION()) {
-
-					@Override
-					protected void run() {
-						try {
-							submitJob(jobGraph, jobClient);
-							fail("Expected JobSubmissionException.");
-						}
-						catch (JobSubmissionException e) {
-							assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
-						}
-						catch (Throwable t) {
-							t.printStackTrace();
-							fail("Caught wrong exception of type " + t.getClass() + ".");
-						}
-
-						try {
-							JobClient.submitJobAndWait(workingJobGraph, false, jobClient,
-									TestingUtils.TESTING_DURATION());
-						} catch (Throwable t) {
-							fail("Caught unexpected exception " + t.getMessage() + ".");
-						}
-					}
-				};
-			} finally {
-				cluster.stop();
+				submitJob(jobGraph);
+				fail("Expected JobSubmissionException.");
+			}
+			catch (JobSubmissionException e) {
+				assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
+			}
+			catch (Throwable t) {
+				t.printStackTrace();
+				fail("Caught wrong exception of type " + t.getClass() + ".");
 			}
-		}};
+	
+			cluser.submitJobAndWait(workingJobGraph, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
-
+	
 	@Test
 	public void testSubmitNullJobGraph() {
-		new JavaTestKit(system) {{
-			final int numSlots = 20;
-
-			final ForkableFlinkMiniCluster cluster =
-					ForkableFlinkMiniCluster.startCluster(numSlots/2, 2,
-							TestingUtils.TESTING_DURATION().toString());
-
-			final ActorRef jobClient = cluster.getJobClient();
-
+		try {
 			try {
-				new Within(TestingUtils.TESTING_DURATION()) {
-
-					@Override
-					protected void run() {
-						try {
-							submitJob(null, jobClient);
-							fail("Expected JobSubmissionException.");
-						} catch (JobSubmissionException e) {
-							assertEquals("JobGraph must not be null.", e.getMessage());
-						} catch (Throwable t) {
-							fail("Caught wrong exception of type " + t.getClass() + ".");
-							t.printStackTrace();
-						}
-
-						try {
-							JobClient.submitJobAndWait(workingJobGraph, false, jobClient,
-									TestingUtils.TESTING_DURATION());
-						} catch (Throwable t) {
-							fail("Caught unexpected exception " + t.getMessage() + ".");
-						}
-					}
-				};
-			} finally {
-				cluster.stop();
+				submitJob(null);
+				fail("Expected JobSubmissionException.");
+			}
+			catch (NullPointerException e) {
+				// yo!
+			}
+			catch (Throwable t) {
+				t.printStackTrace();
+				fail("Caught wrong exception of type " + t.getClass() + ".");
 			}
-		}};
+
+			cluser.submitJobAndWait(workingJobGraph, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 
+	// --------------------------------------------------------------------------------------------
+	
 	public static class FailingJobVertex extends AbstractJobVertex {
+		private static final long serialVersionUID = -6365291240199412135L;
+
 		public FailingJobVertex(final String msg) {
 			super(msg);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
index 19cf611..a739855 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
@@ -39,7 +39,6 @@ import org.apache.flink.util.Collector;
 /**
  * Tests whether the system recovers from a runtime exception from the user code.
  */
-@SuppressWarnings("deprecation")
 public class TaskFailureITCase extends FailingTestBase {
 
 	private static final int parallelism = 4;
@@ -51,7 +50,8 @@ public class TaskFailureITCase extends FailingTestBase {
 											"1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
 
 	// expected result of working map job
-	private static final String MAP_RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
+	private static final String MAP_RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n" +
+											"3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
 
 	private String inputPath;
 	private String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
index 2901bf8..c102c1f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -112,10 +112,10 @@ public abstract class AbstractProcessFailureRecoveryTest {
 			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
 
 			Configuration jmConfig = new Configuration();
-			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
-			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1 s");
+			jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4 s");
 			jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
-			jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "2 s");
+			jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "4 s");
 
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
 			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
index e71e0bf..f3aa3e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.test.util;
 
-import akka.actor.ActorRef;
 import org.junit.Assert;
 
-import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
@@ -29,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
  * Base class for integration tests which test whether the system recovers from failed executions.
  */
 public abstract class FailingTestBase extends RecordAPITestBase {
+	
 	/**
 	 * Returns the {@link JobGraph} of the failing job. 
 	 * 
@@ -116,13 +115,16 @@ public abstract class FailingTestBase extends RecordAPITestBase {
 	 */
 	private class SubmissionThread extends Thread {
 
-		// reference to the timeout thread
+		/** reference to the timeout thread */
 		private final Thread timeoutThread;
-		// cluster to submit the job to.
+		
+		//**cluster to submit the job to. */
 		private final ForkableFlinkMiniCluster executor;
-		// job graph of the failing job (submitted first)
+		
+		/** job graph of the failing job (submitted first) */
 		private final JobGraph failingJob;
-		// job graph of the working job (submitted after return from failing job)
+		
+		/** job graph of the working job (submitted after return from failing job) */
 		private final JobGraph job;
 		
 		private volatile Exception error;
@@ -142,24 +144,24 @@ public abstract class FailingTestBase extends RecordAPITestBase {
 		 */
 		@Override
 		public void run() {
-			ActorRef client = this.executor.getJobClient();
-
 			try {
 				// submit failing job
-				JobClient.submitJobAndWait(this.failingJob, false, client, executor.timeout());
-				
+				this.executor.submitJobAndWait(this.failingJob, false);
 				this.error = new Exception("The job did not fail.");
-			} catch(JobExecutionException jee) {
+			}
+			catch(JobExecutionException jee) {
 				// as expected
-			} catch (Exception e) {
+			}
+			catch (Exception e) {
 				this.error = e;
 			}
 			
 			
 			try {
 				// submit working job
-				JobClient.submitJobAndWait(this.job, false, client, executor.timeout());
-			} catch (Exception e) {
+				this.executor.submitJobAndWait(this.job, false);
+			}
+			catch (Exception e) {
 				this.error = e;
 			}
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 12e0e5b..3e17225 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -102,7 +102,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
 
           tm ! NotifyWhenJobManagerTerminated(jm)
@@ -117,7 +117,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 
           cluster.waitForTaskManagersToBeRegistered()
 
-          jm ! SubmitJob(jobGraph2)
+          jm ! SubmitJob(jobGraph2, false)
 
           val failure = expectMsgType[Success]
 


[5/5] flink git commit: [tests] Cleanup sysout logging in tests

Posted by se...@apache.org.
[tests] Cleanup sysout logging in tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69a400fa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69a400fa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69a400fa

Branch: refs/heads/master
Commit: 69a400fadd258fe0a1ff0b5670a3611fda4c1cdf
Parents: e79813b
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 13 16:37:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 13 16:38:22 2015 +0200

----------------------------------------------------------------------
 .../operations/DegreesWithExceptionITCase.java  | 64 +++++++++++---------
 .../flink/test/misc/AutoParallelismITCase.java  |  1 +
 .../flink/test/misc/NullValuesITCase.java       |  1 +
 .../test/recovery/SimpleRecoveryITCase.java     |  4 ++
 .../TaskManagerFailureRecoveryITCase.java       |  1 +
 5 files changed, 43 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69a400fa/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index 489e50a..b04b24e 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -21,11 +21,13 @@ package org.apache.flink.graph.test.operations;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -37,17 +39,19 @@ public class DegreesWithExceptionITCase {
 	private static final int PARALLELISM = 4;
 
 	private static ForkableFlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void suppressOutput() {
-		TestGraphUtils.pipeSystemOutToNull();
-	}
+	
 
 	@BeforeClass
 	public static void setupCluster() {
-		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-		cluster = new ForkableFlinkMiniCluster(config, false);
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+			cluster = new ForkableFlinkMiniCluster(config, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Error starting test cluster: " + e.getMessage());
+		}
 	}
 
 	@AfterClass
@@ -56,7 +60,6 @@ public class DegreesWithExceptionITCase {
 			cluster.stop();
 		}
 		catch (Throwable t) {
-			System.err.println("Error stopping cluster on shutdown");
 			t.printStackTrace();
 			fail("Cluster shutdown caused an exception: " + t.getMessage());
 		}
@@ -70,9 +73,9 @@ public class DegreesWithExceptionITCase {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"localhost", cluster.getJobManagerRPCPort());
-
 		env.setParallelism(PARALLELISM);
-
+		env.getConfig().disableSysoutLogging();
+		
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
 
@@ -86,15 +89,16 @@ public class DegreesWithExceptionITCase {
 		}
 	}
 
+	/**
+	 * Test inDegrees() with an edge having a trgId that does not exist in the vertex DataSet
+	 */
 	@Test
 	public void testInDegreesInvalidEdgeTrgId() throws Exception {
-		/*
-		* Test inDegrees() with an edge having a trgId that does not exist in the vertex DataSet
-		*/
+
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"localhost", cluster.getJobManagerRPCPort());
-
 		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
@@ -109,15 +113,16 @@ public class DegreesWithExceptionITCase {
 		}
 	}
 
+	/**
+	 * Test getDegrees() with an edge having a trgId that does not exist in the vertex DataSet
+	 */
 	@Test
 	public void testGetDegreesInvalidEdgeTrgId() throws Exception {
-		/*
-		* Test getDegrees() with an edge having a trgId that does not exist in the vertex DataSet
-		*/
+
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"localhost", cluster.getJobManagerRPCPort());
-
 		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
@@ -132,15 +137,16 @@ public class DegreesWithExceptionITCase {
 		}
 	}
 
+	/**
+	 * Test getDegrees() with an edge having a srcId that does not exist in the vertex DataSet
+	 */
 	@Test
 	public void testGetDegreesInvalidEdgeSrcId() throws Exception {
-		/*
-		* Test getDegrees() with an edge having a srcId that does not exist in the vertex DataSet
-		*/
+
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"localhost", cluster.getJobManagerRPCPort());
-
 		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
@@ -155,15 +161,16 @@ public class DegreesWithExceptionITCase {
 		}
 	}
 
+	/**
+	 * Test getDegrees() with an edge having a srcId and a trgId that does not exist in the vertex DataSet
+	 */
 	@Test
 	public void testGetDegreesInvalidEdgeSrcTrgId() throws Exception {
-		/*
-		* Test getDegrees() with an edge having a srcId and a trgId that does not exist in the vertex DataSet
-		*/
+
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"localhost", cluster.getJobManagerRPCPort());
-
 		env.setParallelism(PARALLELISM);
+		env.getConfig().disableSysoutLogging();
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeInvalidSrcTrgData(env), env);
@@ -173,7 +180,8 @@ public class DegreesWithExceptionITCase {
 			env.execute();
 
 			fail("graph.getDegrees() did not fail.");
-		} catch (Exception e) {
+		}
+		catch (Exception e) {
 			// We expect the job to fail with an exception
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/69a400fa/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index aeab77b..df190d7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -79,6 +79,7 @@ public class AutoParallelismITCase {
 					"localhost", cluster.getJobManagerRPCPort());
 
 			env.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+			env.getConfig().disableSysoutLogging();
 
 			DataSet<Integer> result = env
 					.createInput(new ParallelismDependentInputFormat())

http://git-wip-us.apache.org/repos/asf/flink/blob/69a400fa/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
index e0ebadd..6f7d002 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
@@ -49,6 +49,7 @@ public class NullValuesITCase {
 					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
 
 			env.setParallelism(1);
+			env.getConfig().disableSysoutLogging();
 
 			DataSet<String> data = env.fromElements("hallo")
 					.map(new MapFunction<String, String>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/69a400fa/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 0361af8..e61e551 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -81,6 +81,7 @@ public class SimpleRecoveryITCase {
 
 				env.setParallelism(4);
 				env.setNumberOfExecutionRetries(0);
+				env.getConfig().disableSysoutLogging();
 
 				env.generateSequence(1, 10)
 						.rebalance()
@@ -110,6 +111,7 @@ public class SimpleRecoveryITCase {
 
 				env.setParallelism(4);
 				env.setNumberOfExecutionRetries(0);
+				env.getConfig().disableSysoutLogging();
 
 				env.generateSequence(1, 10)
 						.rebalance()
@@ -156,6 +158,7 @@ public class SimpleRecoveryITCase {
 
 			env.setParallelism(4);
 			env.setNumberOfExecutionRetries(1);
+			env.getConfig().disableSysoutLogging();
 
 			env.generateSequence(1, 10)
 					.rebalance()
@@ -200,6 +203,7 @@ public class SimpleRecoveryITCase {
 
 			env.setParallelism(4);
 			env.setNumberOfExecutionRetries(5);
+			env.getConfig().disableSysoutLogging();
 
 			env.generateSequence(1, 10)
 					.rebalance()

http://git-wip-us.apache.org/repos/asf/flink/blob/69a400fa/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index aa92925..1c9f798 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -87,6 +87,7 @@ public class TaskManagerFailureRecoveryITCase {
 
 			env.setParallelism(PARALLELISM);
 			env.setNumberOfExecutionRetries(1);
+			env.getConfig().disableSysoutLogging();
 
 			env.generateSequence(1, 10)
 					.map(new FailingMapper<Long>())


[3/5] flink git commit: [FLINK-1879] [client] Simplify JobClient. Hide actorRefs behind method calls where possible.

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/ad637077/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 7449215..0b6c981 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -101,7 +101,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
 
           jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
@@ -148,7 +148,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try {
         within(TestingUtils.TESTING_DURATION) {
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
 
           jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
@@ -191,7 +191,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 
       try{
         within(TestingUtils.TESTING_DURATION){
-          jm ! SubmitJob(jobGraph)
+          jm ! SubmitJob(jobGraph, false)
           expectMsg(Success(jobGraph.getJobID))
 
           tm ! PoisonPill
@@ -213,7 +213,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 
           expectMsg(RegisteredAtJobManager)
 
-          jm ! SubmitJob(jobGraph2)
+          jm ! SubmitJob(jobGraph2, false)
 
           expectMsgType[Success]