You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/03/31 10:49:26 UTC

[4/4] flink git commit: [FLINK-1771] Add support for submitting single jobs to a detached YARN session

[FLINK-1771] Add support for submitting single jobs to a detached YARN session

With this change, users can submit a Flink job to a YARN cluster without having a local client monitoring the Application Master or job. You can basically fire and forget a Flink job to YARN.
For supporting this, the ApplicationMaster can now monitor the status of a job and shutdown itself once it is in a terminal state.

The change also verifies that various ways of setting the parallelism on YARN are passed through the system correctly (per job, session).

There was a bug in YARN container creation which made the configuration values for the heap offset useless. This change fixes this error.

All mentioned features and bugs are covered by the flink-yarn-tests.

This closes #542


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b0d4076
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b0d4076
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b0d4076

Branch: refs/heads/master
Commit: 6b0d40764da9dce2e2d21882e9a03a21c6783ff0
Parents: 121a5a0
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Mar 13 15:53:51 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Mar 31 08:56:39 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 116 ++++++---
 .../flink/client/FlinkYarnSessionCli.java       |  12 +-
 .../org/apache/flink/client/LocalExecutor.java  |  19 +-
 .../org/apache/flink/client/RemoteExecutor.java |  27 ++-
 .../org/apache/flink/client/program/Client.java |  85 ++++---
 .../client/program/ContextEnvironment.java      |  39 ++-
 .../flink/client/CliFrontendInfoTest.java       |   4 +-
 .../flink/client/CliFrontendListCancelTest.java |   2 +-
 .../apache/flink/client/CliFrontendRunTest.java |   2 +-
 .../client/program/ClientConnectionTest.java    |   2 +-
 .../apache/flink/client/program/ClientTest.java |   9 +-
 .../ExecutionPlanAfterExecutionTest.java        |   3 +-
 .../program/ExecutionPlanCreationTest.java      |   2 +-
 .../flink/api/common/JobExecutionResult.java    |   7 +-
 .../java/org/apache/flink/api/common/JobID.java |  60 +++++
 .../flink/api/common/JobSubmissionResult.java   |  39 +++
 .../common/operators/CollectionExecutor.java    |   2 +-
 .../main/flink-bin/conf/log4j-cli.properties    |   1 +
 .../examples/java/wordcount/WordCount.java      |   1 -
 flink-optimizer/pom.xml                         |  19 +-
 .../org/apache/flink/optimizer/Optimizer.java   |  20 +-
 .../optimizer/AdditionalOperatorsTest.java      |   1 +
 .../optimizer/BranchingPlansCompilerTest.java   |   1 +
 .../BroadcastVariablePipelinebreakerTest.java   |   1 +
 .../CachedMatchStrategyCompilerTest.java        |   1 +
 .../optimizer/CoGroupSolutionSetFirstTest.java  |   1 +
 .../flink/optimizer/CompilerTestBase.java       | 229 ------------------
 .../flink/optimizer/DisjointDataFlowsTest.java  |   1 +
 .../optimizer/DistinctCompilationTest.java      |   1 +
 .../apache/flink/optimizer/GroupOrderTest.java  |   1 +
 .../optimizer/HardPlansCompilationTest.java     |   1 +
 .../flink/optimizer/IterationsCompilerTest.java |   1 +
 .../flink/optimizer/NestedIterationsTest.java   |   1 +
 .../flink/optimizer/ParallelismChangeTest.java  |   1 +
 .../flink/optimizer/PartitionPushdownTest.java  |   1 +
 .../optimizer/PartitioningReusageTest.java      |   1 +
 .../flink/optimizer/PipelineBreakerTest.java    |   1 +
 .../flink/optimizer/PropertyDataSourceTest.java |   1 +
 .../apache/flink/optimizer/ReduceAllTest.java   |   1 +
 .../optimizer/ReplicatingDataSourceTest.java    |   1 +
 .../SemanticPropertiesAPIToPlanTest.java        |   1 +
 .../flink/optimizer/SortPartialReuseTest.java   |   1 +
 .../UnionBetweenDynamicAndStaticPathTest.java   |   1 +
 .../optimizer/UnionPropertyPropagationTest.java |   1 +
 .../flink/optimizer/UnionReplacementTest.java   |   1 +
 .../WorksetIterationCornerCasesTest.java        |   1 +
 .../WorksetIterationsRecordApiCompilerTest.java |   1 +
 ...naryCustomPartitioningCompatibilityTest.java |   2 +-
 .../CoGroupCustomPartitioningTest.java          |   2 +-
 ...ustomPartitioningGlobalOptimizationTest.java |   2 +-
 .../custompartition/CustomPartitioningTest.java |   2 +-
 .../GroupingKeySelectorTranslationTest.java     |   2 +-
 .../GroupingPojoTranslationTest.java            |   2 +-
 .../GroupingTupleTranslationTest.java           |   2 +-
 .../JoinCustomPartitioningTest.java             |   2 +-
 .../DataExchangeModeClosedBranchingTest.java    |   2 +-
 .../DataExchangeModeForwardTest.java            |   2 +-
 .../DataExchangeModeOpenBranchingTest.java      |   2 +-
 .../java/DeltaIterationDependenciesTest.java    |   2 +-
 .../java/DistinctAndGroupingOptimizerTest.java  |   2 +-
 .../java/GroupReduceCompilationTest.java        |   2 +-
 .../optimizer/java/IterationCompilerTest.java   |   2 +-
 .../optimizer/java/JoinTranslationTest.java     |   2 +-
 .../flink/optimizer/java/OpenIterationTest.java |   2 +-
 .../optimizer/java/PartitionOperatorTest.java   |   2 +-
 .../optimizer/java/ReduceCompilationTest.java   |   2 +-
 .../WorksetIterationsJavaApiCompilerTest.java   |   2 +-
 .../CoGroupOnConflictingPartitioningsTest.java  |   2 +-
 .../JoinOnConflictingPartitioningsTest.java     |   2 +-
 .../flink/optimizer/util/CompilerTestBase.java  | 240 +++++++++++++++++++
 .../flink/optimizer/util/OperatorResolver.java  | 127 ++++++++++
 flink-quickstart/pom.xml                        |   2 +-
 .../runtime/accumulators/AccumulatorEvent.java  |   2 +-
 .../apache/flink/runtime/blob/BlobClient.java   |   2 +-
 .../apache/flink/runtime/blob/BlobServer.java   |   2 +-
 .../runtime/blob/BlobServerConnection.java      |   2 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |   2 +-
 .../client/JobCancellationException.java        |   2 +-
 .../runtime/client/JobExecutionException.java   |   2 +-
 .../flink/runtime/client/JobStatusMessage.java  |   2 +-
 .../runtime/client/JobSubmissionException.java  |   2 +-
 .../runtime/client/JobTimeoutException.java     |   2 +-
 .../deployment/TaskDeploymentDescriptor.java    |   2 +-
 .../flink/runtime/execution/Environment.java    |   2 +-
 .../runtime/execution/RuntimeEnvironment.java   |   2 +-
 .../librarycache/BlobLibraryCacheManager.java   |   2 +-
 .../FallbackLibraryCacheManager.java            |   2 +-
 .../librarycache/LibraryCacheManager.java       |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   2 +-
 .../executiongraph/ExecutionJobVertex.java      |   2 +-
 .../runtime/executiongraph/ExecutionVertex.java |   2 +-
 .../flink/runtime/filecache/FileCache.java      |   2 +-
 .../apache/flink/runtime/instance/Instance.java |   2 +-
 .../flink/runtime/instance/SharedSlot.java      |   2 +-
 .../flink/runtime/instance/SimpleSlot.java      |   2 +-
 .../org/apache/flink/runtime/instance/Slot.java |   2 +-
 .../runtime/io/network/NetworkEnvironment.java  |   2 +-
 .../io/network/partition/ResultPartition.java   |   2 +-
 .../ResultPartitionConsumableNotifier.java      |   3 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |   1 +
 .../apache/flink/runtime/jobgraph/JobID.java    |  61 -----
 .../accumulators/AccumulatorManager.java        |   2 +-
 .../jobmanager/web/JobManagerInfoServlet.java   |   2 +-
 .../runtime/profiling/JobManagerProfiler.java   |   2 +-
 .../profiling/impl/EnvironmentThreadSet.java    |   2 +-
 .../InternalExecutionVertexProfilingData.java   |   2 +-
 ...ernalExecutionVertexThreadProfilingData.java |   2 +-
 .../profiling/types/InstanceProfilingEvent.java |   2 +-
 .../types/InstanceSummaryProfilingEvent.java    |   2 +-
 .../runtime/profiling/types/ProfilingEvent.java |   2 +-
 .../types/SingleInstanceProfilingEvent.java     |   2 +-
 .../profiling/types/ThreadProfilingEvent.java   |   2 +-
 .../profiling/types/VertexProfilingEvent.java   |   2 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../runtime/taskmanager/TaskExecutionState.java |   2 +-
 .../taskmanager/TaskInputSplitProvider.java     |   2 +-
 .../runtime/yarn/AbstractFlinkYarnClient.java   |  84 ++++++-
 .../runtime/yarn/AbstractFlinkYarnCluster.java  |  29 +++
 .../apache/flink/runtime/client/JobClient.scala |   4 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../runtime/jobmanager/MemoryArchivist.scala    |   2 +-
 .../StreamCheckpointCoordinator.scala           |   3 +-
 .../runtime/messages/ArchiveMessages.scala      |   2 +-
 .../messages/ExecutionGraphMessages.scala       |   3 +-
 .../runtime/messages/JobManagerMessages.scala   |   3 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   4 +
 .../flink/runtime/blob/BlobClientTest.java      |   2 +-
 .../runtime/blob/BlobServerDeleteTest.java      |   2 +-
 .../flink/runtime/blob/BlobServerPutTest.java   |   2 +-
 .../TaskDeploymentDescriptorTest.java           |   2 +-
 .../BlobLibraryCacheManagerTest.java            |   2 +-
 .../ExecutionGraphConstructionTest.java         |   2 +-
 .../ExecutionGraphDeploymentTest.java           |   2 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   2 +-
 .../ExecutionStateProgressTest.java             |   2 +-
 .../ExecutionVertexCancelTest.java              |   2 +-
 .../ExecutionVertexDeploymentTest.java          |   2 +-
 .../ExecutionVertexSchedulingTest.java          |   2 +-
 .../executiongraph/PointwisePatternTest.java    |   2 +-
 .../executiongraph/VertexSlotSharingTest.java   |   2 +-
 .../FileCacheDeleteValidationTest.java          |   2 +-
 .../flink/runtime/instance/InstanceTest.java    |   2 +-
 .../flink/runtime/instance/SimpleSlotTest.java  |   2 +-
 .../consumer/LocalInputChannelTest.java         |   2 +-
 .../flink/runtime/jobgraph/JobIdTest.java       |   1 +
 .../scheduler/SchedulerTestUtils.java           |   2 +-
 .../jobmanager/scheduler/SharedSlotsTest.java   |   2 +-
 .../scheduler/SlotAllocationFutureTest.java     |   2 +-
 .../operators/testutils/MockEnvironment.java    |   2 +-
 .../profiling/types/ProfilingTypesTest.java     |   5 +-
 .../taskmanager/TaskExecutionStateTest.java     |   2 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   2 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   2 +-
 .../ExecutionGraphRestartTest.scala             |   3 +-
 .../TaskManagerLossFailsTasksTest.scala         |   3 +-
 .../testingUtils/TestingJobManager.scala        |   3 +-
 .../TestingJobManagerMessages.scala             |   3 +-
 .../testingUtils/TestingTaskManager.scala       |   2 +-
 .../TestingTaskManagerMessages.scala            |   2 +-
 .../api/avro/AvroExternalJarProgramITCase.java  |   2 +-
 flink-staging/flink-spargel/pom.xml             |   7 +
 .../flink/spargel/java/SpargelCompilerTest.java |   2 +-
 .../environment/RemoteStreamEnvironment.java    |  11 +-
 .../environment/StreamContextEnvironment.java   |  18 +-
 .../test/compiler/util/CompilerTestBase.java    | 201 ----------------
 .../test/compiler/util/OperatorResolver.java    | 127 ----------
 .../flink/test/util/RecordAPITestBase.java      |   2 +-
 .../apache/flink/test/util/TestEnvironment.java |   2 +-
 flink-tests/pom.xml                             |   8 +
 .../test/cancelling/CancellingTestBase.java     |   2 +-
 .../compiler/examples/KMeansSingleStepTest.java |   4 +-
 .../examples/RelationalQueryCompilerTest.java   |   4 +-
 .../examples/WordCountCompilerTest.java         |   2 +-
 .../ConnectedComponentsCoGroupTest.java         |   2 +-
 .../iterations/ConnectedComponentsTest.java     |   2 +-
 .../iterations/IterativeKMeansTest.java         |   4 +-
 ...ultipleJoinsWithSolutionSetCompilerTest.java |   2 +-
 .../iterations/PageRankCompilerTest.java        |   2 +-
 .../compiler/plandump/DumpCompiledPlanTest.java |   2 +-
 .../test/failingPrograms/TaskFailureITCase.java |   4 +-
 .../flink/test/operators/ReduceITCase.java      |   2 +-
 .../flink/test/operators/UnionSinkITCase.java   |   2 +-
 .../PartitionOperatorTranslationTest.scala      |   2 +-
 .../CoGroupCustomPartitioningTest.scala         |   2 +-
 .../CoGroupGroupSortTranslationTest.scala       |   2 +-
 ...tomPartitioningGroupingKeySelectorTest.scala |   2 +-
 .../CustomPartitioningGroupingPojoTest.scala    |   2 +-
 .../CustomPartitioningGroupingTupleTest.scala   |   2 +-
 .../translation/CustomPartitioningTest.scala    |   2 +-
 .../JoinCustomPartitioningTest.scala            |   2 +-
 .../YARNSessionCapacitySchedulerITCase.java     |   6 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       | 183 +++++++++++---
 .../org/apache/flink/yarn/YarnTestBase.java     |  78 +++++-
 .../src/main/resources/log4j-test.properties    |   2 +-
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  29 ++-
 .../org/apache/flink/yarn/FlinkYarnCluster.java | 144 ++++++++---
 .../main/java/org/apache/flink/yarn/Utils.java  |   7 +-
 .../apache/flink/yarn/ApplicationClient.scala   |  20 +-
 .../flink/yarn/ApplicationMasterActor.scala     |  70 +++++-
 .../scala/org/apache/flink/yarn/Messages.scala  |  10 +
 .../java/org/apache/flink/yarn/UtilsTests.java  |  18 +-
 pom.xml                                         |   2 +-
 202 files changed, 1493 insertions(+), 1003 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index dd2a0ba..a13b322 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -42,6 +42,7 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.cli.CancelOptions;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontendParser;
@@ -66,7 +67,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
@@ -265,12 +266,32 @@ public class CliFrontend {
 		}
 
 		try {
-			Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName());
-
-			int parallelism = options.getParallelism();
-			int exitCode = executeProgram(program, client, parallelism);
-
-			if (yarnCluster != null) {
+			int userParallelism = options.getParallelism();
+			LOG.debug("User parallelism is set to {}", userParallelism);
+
+			Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
+			LOG.debug("Client slots is set to {}", client.getMaxSlots());
+			if(client.getMaxSlots() != -1 && userParallelism == -1) {
+				logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
+						"To use another parallelism, set it at the ./bin/flink client.");
+				userParallelism = client.getMaxSlots();
+			}
+			int exitCode = 0;
+
+			// check if detached per job yarn cluster is used to start flink
+			if(yarnCluster != null && yarnCluster.isDetached()) {
+				logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
+						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
+						"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
+						"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
+				executeProgram(program, client, userParallelism, false);
+			} else {
+				// regular (blocking) execution.
+				exitCode = executeProgram(program, client, userParallelism, true);
+			}
+
+			// show YARN cluster status if its not a detached YARN cluster.
+			if (yarnCluster != null && !yarnCluster.isDetached()) {
 				List<String> msgs = yarnCluster.getNewMessages();
 				if (msgs != null && msgs.size() > 1) {
 
@@ -291,7 +312,7 @@ public class CliFrontend {
 			return handleError(t);
 		}
 		finally {
-			if (yarnCluster != null) {
+			if (yarnCluster != null && !yarnCluster.isDetached()) {
 				logAndSysout("Shutting down YARN cluster");
 				yarnCluster.shutdown();
 			}
@@ -346,7 +367,7 @@ public class CliFrontend {
 			int parallelism = options.getParallelism();
 
 			LOG.info("Creating program plan dump");
-			Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName());
+			Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), parallelism);
 			String jsonPlan = client.getOptimizedPlanAsJson(program, parallelism);
 
 			if (jsonPlan != null) {
@@ -555,12 +576,12 @@ public class CliFrontend {
 	//  Interaction with programs and JobManager
 	// --------------------------------------------------------------------------------------------
 
-	protected int executeProgram(PackagedProgram program, Client client, int parallelism) {
-		LOG.info("Starting execution or program");
-		JobExecutionResult execResult;
+	protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
+		LOG.info("Starting execution of program");
+		JobSubmissionResult execResult;
 		try {
 			client.setPrintStatusDuringExecution(true);
-			execResult = client.run(program, parallelism, true);
+			execResult = client.run(program, parallelism, wait);
 		}
 		catch (ProgramInvocationException e) {
 			return handleError(e);
@@ -569,15 +590,33 @@ public class CliFrontend {
 			program.deleteExtractedLibraries();
 		}
 
-		LOG.info("Program execution finished");
+		if(wait) {
+			LOG.info("Program execution finished");
+		}
 
-		// we come here after the job has finished
+		// we come here after the job has finished (or the job has been submitted)
 		if (execResult != null) {
-			System.out.println("Job Runtime: " + execResult.getNetRuntime());
-			Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
-			if (accumulatorsResult.size() > 0) {
-				System.out.println("Accumulator Results: ");
-				System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+			// if the job has been submitted to a detached YARN cluster, there won't be any
+			// exec results, but the object will be set (for the job id)
+			if (yarnCluster != null && yarnCluster.isDetached()) {
+				if(execResult.getJobID() == null) {
+					throw new RuntimeException("Error while starting job. No Job ID set.");
+				}
+				yarnCluster.stopAfterJob(execResult.getJobID());
+				yarnCluster.disconnect();
+				System.out.println("The Job has been submitted with JobID "+execResult.getJobID());
+				return 0;
+			}
+			if (execResult instanceof JobExecutionResult) {
+				JobExecutionResult result = (JobExecutionResult) execResult;
+				System.out.println("Job Runtime: " + result.getNetRuntime());
+				Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
+				if (accumulatorsResult.size() > 0) {
+					System.out.println("Accumulator Results: ");
+					System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+				}
+			} else {
+				LOG.info("The Job did not return an execution result");
 			}
 		}
 		return 0;
@@ -681,26 +720,47 @@ public class CliFrontend {
 		LOG.info("JobManager is at " + jmActor.path());
 		return jmActor;
 	}
-	
 
-	
-	protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName) throws Exception {
 
+	/**
+	 *
+	 * @param options
+	 * @param classLoader
+	 * @param programName
+	 * @param userParallelism The parallelism requested by the user in the CLI frontend.
+	 * @return
+	 * @throws Exception
+	 */
+	protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName, int userParallelism) throws Exception {
 		InetSocketAddress jobManagerAddress;
-
+		int maxSlots = -1;
 		if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
 			logAndSysout("YARN cluster mode detected. Switching Log4j output to console");
 
 			// user wants to run Flink in YARN cluster.
 			CommandLine commandLine = options.getCommandLine();
-			AbstractFlinkYarnClient flinkYarnClient =
-					CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);
+			AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);
 
 			if (flinkYarnClient == null) {
 				throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
 			}
+
+			// the number of slots available from YARN:
+			int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
+			if(yarnTmSlots == -1) {
+				yarnTmSlots = 1;
+			}
+			maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
+			if(userParallelism != -1) {
+				int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
+				logAndSysout("The YARN cluster has "+maxSlots+" slots available, but the user requested a parallelism of "+userParallelism+" on YARN. " +
+						"Each of the "+flinkYarnClient.getTaskManagerCount()+" TaskManagers will get "+slotsPerTM+" slots.");
+				flinkYarnClient.setTaskManagerSlots(slotsPerTM);
+			}
+
 			try {
 				yarnCluster = flinkYarnClient.deploy("Flink Application: " + programName);
+				yarnCluster.connectToCluster();
 			}
 			catch(Exception e) {
 				throw new RuntimeException("Error deploying the YARN cluster", e);
@@ -722,7 +782,7 @@ public class CliFrontend {
 						break;
 					}
 				} else {
-					logAndSysout("No status updates from YARN cluster received so far. Waiting ...");
+					logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
 				}
 
 				try {
@@ -738,7 +798,7 @@ public class CliFrontend {
 		else {
 			jobManagerAddress = getJobManagerAddress(options);
 		}
-		return new Client(jobManagerAddress, config, classLoader);
+		return new Client(jobManagerAddress, config, classLoader, maxSlots);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 25f31e3..7352457 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -122,12 +122,6 @@ public class FlinkYarnSessionCli {
 		} else {
 			LOG.info("No path for the flink jar passed. Using the location of "+flinkYarnClient.getClass()+" to locate the jar");
 			localJarPath = new Path("file://"+flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
-			if(!localJarPath.toString().contains("uberjar")) {
-				// we need to have a proper uberjar because otherwise we don't have the required classes available on the cluster.
-				// most likely the user did try to start yarn in a regular hadoop2 flink build (not a yarn package) (using ./bin/flink -m yarn-cluster)
-				LOG.error("The detected jar file '"+localJarPath+"' is not a uberjar.");
-				return null;
-			}
 		}
 
 		flinkYarnClient.setLocalJarPath(localJarPath);
@@ -392,6 +386,10 @@ public class FlinkYarnSessionCli {
 
 			try {
 				yarnCluster = flinkYarnClient.deploy(null);
+				// only connect to cluster if its not a detached session.
+				if(!flinkYarnClient.isDetached()) {
+					yarnCluster.connectToCluster();
+				}
 			} catch (Exception e) {
 				System.err.println("Error while deploying YARN cluster: "+e.getMessage());
 				e.printStackTrace(System.err);
@@ -423,7 +421,7 @@ public class FlinkYarnSessionCli {
 
 			if (detachedMode) {
 				// print info and quit:
-				LOG.info("The Flink YARN client has been started in detached mode. In order to stop" +
+				LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
 						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
 						"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
 						"Please also note that the temporary files of the YARN session in {} will not be removed.",

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 6df0f79..5ee4e5d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -89,15 +89,20 @@ public class LocalExecutor extends PlanExecutor {
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	
+
+	public static Configuration getConfiguration(LocalExecutor le) {
+		Configuration configuration = new Configuration();
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, le.getTaskManagerNumSlots());
+		configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, le.isDefaultOverwriteFiles());
+		return configuration;
+	}
+
 	public void start() throws Exception {
 		synchronized (this.lock) {
 			if (this.flink == null) {
 				
 				// create the embedded runtime
-				Configuration configuration = new Configuration();
-				configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
-				configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
+				Configuration configuration = getConfiguration(this);
 				// start it up
 				this.flink = new LocalFlinkMiniCluster(configuration, true);
 			} else {
@@ -158,7 +163,7 @@ public class LocalExecutor extends PlanExecutor {
 			}
 
 			try {
-				Optimizer pc = new Optimizer(new DataStatistics());
+				Optimizer pc = new Optimizer(new DataStatistics(), this.flink.getConfiguration());
 				OptimizedPlan op = pc.compile(plan);
 				
 				JobGraphGenerator jgg = new JobGraphGenerator();
@@ -186,7 +191,7 @@ public class LocalExecutor extends PlanExecutor {
 	 * @throws Exception
 	 */
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-		Optimizer pc = new Optimizer(new DataStatistics());
+		Optimizer pc = new Optimizer(new DataStatistics(), getConfiguration(this));
 		OptimizedPlan op = pc.compile(plan);
 		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 	
@@ -242,7 +247,7 @@ public class LocalExecutor extends PlanExecutor {
 		LocalExecutor exec = new LocalExecutor();
 		try {
 			exec.start();
-			Optimizer pc = new Optimizer(new DataStatistics());
+			Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.getConfiguration());
 			OptimizedPlan op = pc.compile(plan);
 			PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 74a7ddb..1759b65 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.Client;
@@ -35,11 +36,13 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RemoteExecutor extends PlanExecutor {
+	private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);
 
 	private final List<String> jarFiles;
-	
 	private final InetSocketAddress address;
 	
 	public RemoteExecutor(String hostname, int port) {
@@ -86,22 +89,34 @@ public class RemoteExecutor extends PlanExecutor {
 	}
 	
 	public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
-		Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader());
-		return c.run(p, -1, true);
+		Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader(), -1);
+		JobSubmissionResult result = c.run(p, -1, true);
+		if(result instanceof JobExecutionResult) {
+			return (JobExecutionResult) result;
+		} else {
+			LOG.warn("The Client didn't return a JobExecutionResult");
+			return new JobExecutionResult(result.getJobID(), -1, null);
+		}
 	}
 
 	public JobExecutionResult executeJar(String jarPath, String assemblerClass, String... args) throws Exception {
 		File jarFile = new File(jarPath);
 		PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
 		
-		Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader());
-		return c.run(program.getPlanWithJars(), -1, true);
+		Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader(), -1);
+		JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
+		if(result instanceof JobExecutionResult) {
+			return (JobExecutionResult) result;
+		} else {
+			LOG.warn("The Client didn't return a JobExecutionResult");
+			return new JobExecutionResult(result.getJobID(), -1, null);
+		}
 	}
 
 	@Override
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
 		JobWithJars p = new JobWithJars(plan, this.jarFiles);
-		Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader());
+		Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader(), -1);
 		
 		OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
 		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 9a578bc..6dff9e2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -25,6 +25,8 @@ import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.List;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
@@ -69,6 +71,19 @@ public class Client {
 	private final Optimizer compiler;		// the compiler to compile the jobs
 	
 	private boolean printStatusDuringExecution = false;
+
+	/**
+	 * If != -1, this field specifies the total number of available slots on the cluster
+	 * conntected to the client.
+	 */
+	private int maxSlots = -1;
+
+	/**
+	 * ID of the last job submitted with this client.
+	 */
+	private JobID lastJobId = null;
+
+	private ClassLoader userCodeClassLoader; // TODO: use userCodeClassloader to deserialize accumulator results.
 	
 	// ------------------------------------------------------------------------
 	//                            Construction
@@ -80,7 +95,7 @@ public class Client {
 	 * 
 	 * @param jobManagerAddress Address and port of the job-manager.
 	 */
-	public Client(InetSocketAddress jobManagerAddress, Configuration config, ClassLoader userCodeClassLoader) {
+	public Client(InetSocketAddress jobManagerAddress, Configuration config, ClassLoader userCodeClassLoader, int maxSlots) {
 		Preconditions.checkNotNull(config, "Configuration is null");
 		this.configuration = config;
 		
@@ -88,7 +103,9 @@ public class Client {
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
 		
-		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator());
+		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
+		this.userCodeClassLoader = userCodeClassLoader;
+		this.maxSlots = maxSlots;
 	}
 
 	/**
@@ -112,7 +129,8 @@ public class Client {
 			throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
 		}
 
-		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator());
+		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
+		this.userCodeClassLoader = userCodeClassLoader;
 	}
 	
 	public void setPrintStatusDuringExecution(boolean print) {
@@ -126,6 +144,14 @@ public class Client {
 	public int getJobManagerPort() {
 		return this.configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
 	}
+
+	/**
+	 * @return -1 if unknown. The maximum number of available processing slots at the Flink cluster
+	 * connected to this client.
+	 */
+	public int getMaxSlots() {
+		return this.maxSlots;
+	}
 	
 	// ------------------------------------------------------------------------
 	//                      Compilation and Submission
@@ -191,8 +217,10 @@ public class Client {
 	
 	public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
 		if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
+			LOG.debug("Changing plan default parallelism from {} to {}",p.getDefaultParallelism(), parallelism);
 			p.setDefaultParallelism(parallelism);
 		}
+		LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
 
 		return this.compiler.compile(p);
 	}
@@ -230,49 +258,31 @@ public class Client {
 		return job;
 	}
 
-	public JobExecutionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
+	public JobSubmissionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
 			return run(prog.getPlanWithJars(), parallelism, wait);
 		}
 		else if (prog.isUsingInteractiveMode()) {
-			
-			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism);
+			LOG.info("Starting program in interactive mode");
+			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, wait);
 			ContextEnvironment.enableLocalExecution(false);
-			if (wait) {
-				// invoke here
-				try {
-					prog.invokeInteractiveModeForExecution();
-				}
-				finally {
-					ContextEnvironment.enableLocalExecution(true);
-				}
+			// invoke here
+			try {
+				prog.invokeInteractiveModeForExecution();
 			}
-			else {
-				// invoke in the background
-				Thread backGroundRunner = new Thread("Program Runner") {
-					public void run() {
-						try {
-							prog.invokeInteractiveModeForExecution();
-						}
-						catch (Throwable t) {
-							LOG.error("The program execution failed.", t);
-						}
-						finally {
-							ContextEnvironment.enableLocalExecution(true);
-						}
-					}
-				};
-				backGroundRunner.start();
+			finally {
+				ContextEnvironment.enableLocalExecution(true);
 			}
-			return null;
+
+			return new JobSubmissionResult(lastJobId);
 		}
 		else {
 			throw new RuntimeException();
 		}
 	}
 	
-	public JobExecutionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
+	public JobSubmissionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
 		return run(optimizedPlan, prog.getAllLibraries(), wait);
 
 	}
@@ -291,17 +301,18 @@ public class Client {
 	 *                                    i.e. the job-manager is unreachable, or due to the fact that the
 	 *                                    parallel execution failed.
 	 */
-	public JobExecutionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
+	public JobSubmissionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
 		return run((OptimizedPlan) getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
 	}
 	
 
-	public JobExecutionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
+	public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
 		JobGraph job = getJobGraph(compiledPlan, libraries);
+		this.lastJobId = job.getJobID();
 		return run(job, wait);
 	}
 
-	public JobExecutionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
+	public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
 
 		final String hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
 		if (hostname == null) {
@@ -335,6 +346,8 @@ public class Client {
 			}
 			else {
 				JobClient.submitJobDetached(jobGraph, client, timeout);
+				// return a "Fake" execution result with the JobId
+				return new JobSubmissionResult(jobGraph.getJobID());
 			}
 		}
 		catch (JobExecutionException e) {
@@ -347,8 +360,6 @@ public class Client {
 			actorSystem.shutdown();
 			actorSystem.awaitTermination();
 		}
-
-		return new JobExecutionResult(-1, null);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 8d5fe17..55b579a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -22,37 +22,51 @@ import java.io.File;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- *
+ * Execution Environment for remote execution with the Client.
  */
 public class ContextEnvironment extends ExecutionEnvironment {
-	
+
+	private static final Logger LOG = LoggerFactory.getLogger(ContextEnvironment.class);
+
 	private final Client client;
 	
 	private final List<File> jarFilesToAttach;
 	
 	private final ClassLoader userCodeClassLoader;
+
+	private final boolean wait;
 	
 	
 	
-	public ContextEnvironment(Client remoteConnection, List<File> jarFiles, ClassLoader userCodeClassLoader) {
+	public ContextEnvironment(Client remoteConnection, List<File> jarFiles, ClassLoader userCodeClassLoader, boolean wait) {
 		this.client = remoteConnection;
 		this.jarFilesToAttach = jarFiles;
 		this.userCodeClassLoader = userCodeClassLoader;
+		this.wait = wait;
 	}
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		Plan p = createProgramPlan(jobName);
 		JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.userCodeClassLoader);
-		
-		return this.client.run(toRun, getParallelism(), true);
+
+		JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait);
+		if(result instanceof JobExecutionResult) {
+			return (JobExecutionResult) result;
+		} else {
+			LOG.warn("The Client didn't return a JobExecutionResult");
+			return new JobExecutionResult(result.getJobID(), -1, null);
+		}
 	}
 
 	@Override
@@ -60,7 +74,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		Plan p = createProgramPlan("unnamed job");
 		
 		OptimizedPlan op = (OptimizedPlan) this.client.getOptimizedPlan(p, getParallelism());
-		
+
 		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 		return gen.getOptimizerPlanAsJSON(op);
 	}
@@ -83,15 +97,15 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	// --------------------------------------------------------------------------------------------
 	
 	static void setAsContext(Client client, List<File> jarFilesToAttach, 
-				ClassLoader userCodeClassLoader, int defaultParallelism)
+				ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
 	{
-		initializeContextEnvironment(new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism));
+		initializeContextEnvironment(new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait));
 	}
 	
 	protected static void enableLocalExecution(boolean enabled) {
 		ExecutionEnvironment.enableLocalExecution(enabled);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	
 	public static class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
@@ -103,20 +117,23 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		private final ClassLoader userCodeClassLoader;
 		
 		private final int defaultParallelism;
+
+		private final boolean wait;
 		
 
 		public ContextEnvironmentFactory(Client client, List<File> jarFilesToAttach, 
-				ClassLoader userCodeClassLoader, int defaultParallelism)
+				ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
 		{
 			this.client = client;
 			this.jarFilesToAttach = jarFilesToAttach;
 			this.userCodeClassLoader = userCodeClassLoader;
 			this.defaultParallelism = defaultParallelism;
+			this.wait = wait;
 		}
 		
 		@Override
 		public ExecutionEnvironment createExecutionEnvironment() {
-			ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, userCodeClassLoader);
+			ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, userCodeClassLoader, wait);
 			if (defaultParallelism > 0) {
 				env.setParallelism(defaultParallelism);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
index 6d54b58..cb2585d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
@@ -105,7 +105,7 @@ public class CliFrontendInfoTest {
 		}
 
 		@Override
-		protected Client getClient(CommandLineOptions options, ClassLoader loader, String programName)
+		protected Client getClient(CommandLineOptions options, ClassLoader loader, String programName, int par)
 				throws Exception
 		{
 			return new TestClient(expectedDop);
@@ -118,7 +118,7 @@ public class CliFrontendInfoTest {
 		
 		private TestClient(int expectedDop) throws Exception {
 			super(new InetSocketAddress(InetAddress.getLocalHost(), 6176),
-					new Configuration(), CliFrontendInfoTest.class.getClassLoader());
+					new Configuration(), CliFrontendInfoTest.class.getClassLoader(), -1);
 			
 			this.expectedDop = expectedDop;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 2712c19..3224e0f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -22,7 +22,7 @@ import akka.actor.*;
 import akka.testkit.JavaTestKit;
 
 import org.apache.flink.client.cli.CommandLineOptions;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index 8ed99f9..034ee4e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -93,7 +93,7 @@ public class CliFrontendRunTest {
 		}
 
 		@Override
-		protected int executeProgram(PackagedProgram program, Client client, int parallelism) {
+		protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
 			assertEquals(this.expectedParallelim, parallelism);
 			return 0;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 09a3e1a..be6c19a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -91,7 +91,7 @@ public class ClientConnectionTest {
 			vertex.setInvokableClass(TestInvokable.class);
 
 			final JobGraph jg = new JobGraph("Test Job", vertex);
-			final Client client = new Client(unreachableEndpoint, config, getClass().getClassLoader());
+			final Client client = new Client(unreachableEndpoint, config, getClass().getClassLoader(), -1);
 
 			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 9278c7a..22865ed 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -24,6 +24,7 @@ import akka.actor.Status;
 import akka.actor.UntypedActor;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.DataStatistics;
@@ -35,7 +36,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.net.NetUtils;
 import org.junit.After;
@@ -101,7 +102,7 @@ public class ClientTest {
 		when(program.getPlanWithJars()).thenReturn(planWithJarsMock);
 		when(planWithJarsMock.getPlan()).thenReturn(planMock);
 
-		whenNew(Optimizer.class).withArguments(any(DataStatistics.class), any(CostEstimator.class)).thenReturn(this.compilerMock);
+		whenNew(Optimizer.class).withArguments(any(DataStatistics.class), any(CostEstimator.class), any(Configuration.class)).thenReturn(this.compilerMock);
 		when(compilerMock.compile(planMock)).thenReturn(optimizedPlanMock);
 
 		whenNew(JobGraphGenerator.class).withNoArguments().thenReturn(generatorMock);
@@ -139,11 +140,9 @@ public class ClientTest {
 			jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
 
 			Client out = new Client(config, getClass().getClassLoader());
-			JobExecutionResult result = out.run(program.getPlanWithJars(), -1, false);
+			JobSubmissionResult result = out.run(program.getPlanWithJars(), -1, false);
 
 			assertNotNull(result);
-			assertEquals(-1, result.getNetRuntime());
-			assertNull(result.getAllAccumulatorResults());
 
 			program.deleteExtractedLibraries();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
index e2b7935..f156f77 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -71,7 +71,8 @@ public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
 			env.getExecutionPlan();
 			env.createProgramPlan();
 		} catch (Exception e) {
-			fail("Cannot run both #getExecutionPlan and #execute.");
+			e.printStackTrace();
+			fail("Cannot run both #getExecutionPlan and #execute. Message: "+e.getMessage());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index 8aecff3..67b406d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -44,7 +44,7 @@ public class ExecutionPlanCreationTest {
 			InetAddress mockAddress = InetAddress.getLocalHost();
 			InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345);
 			
-			Client client = new Client(mockJmAddress, new Configuration(), getClass().getClassLoader());
+			Client client = new Client(mockJmAddress, new Configuration(), getClass().getClassLoader(), -1);
 			OptimizedPlan op = (OptimizedPlan) client.getOptimizedPlan(prg, -1);
 			assertNotNull(op);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index 406cfe9..68506ae 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -24,7 +24,7 @@ import java.util.Map;
  * The result of a job execution. Gives access to the execution time of the job,
  * and to all accumulators created by this job.
  */
-public class JobExecutionResult {
+public class JobExecutionResult extends JobSubmissionResult {
 
 	private long netRuntime;
 	private Map<String, Object> accumulatorResults;
@@ -32,10 +32,12 @@ public class JobExecutionResult {
 	/**
 	 * Creates a new JobExecutionResult.
 	 *
+	 * @param jobID
 	 * @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer)
 	 * @param accumulators A map of all accumulators produced by the job.
 	 */
-	public JobExecutionResult(long netRuntime, Map<String, Object> accumulators) {
+	public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulators) {
+		super(jobID);
 		this.netRuntime = netRuntime;
 		this.accumulatorResults = accumulators;
 	}
@@ -92,5 +94,6 @@ public class JobExecutionResult {
 		return (Integer) result;
 	}
 
+
 	// TODO Create convenience methods for the other shipped accumulator types
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobID.java b/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
new file mode 100644
index 0000000..7478da4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import javax.xml.bind.DatatypeConverter;
+import org.apache.flink.util.AbstractID;
+import java.nio.ByteBuffer;
+
+/**
+ * Unique Job Identifier
+ */
+public final class JobID extends AbstractID {
+
+	private static final long serialVersionUID = 1L;
+	
+	public JobID() {
+		super();
+	}
+
+	public JobID(long lowerPart, long upperPart) {
+		super(lowerPart, upperPart);
+	}
+
+	public JobID(byte[] bytes) {
+		super(bytes);
+	}
+
+	public static JobID generate() {
+		return new JobID();
+	}
+
+	public static JobID fromByteArray(byte[] bytes) {
+		return new JobID(bytes);
+	}
+
+	public static JobID fromByteBuffer(ByteBuffer buf) {
+		long lower = buf.getLong();
+		long upper = buf.getLong();
+		return new JobID(lower, upper);
+	}
+
+	public static JobID fromHexString(String hexString) {
+		return new JobID(DatatypeConverter.parseHexBinary(hexString));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
new file mode 100644
index 0000000..5cea9d5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+/**
+ * The result of a job submission.
+ * Contains the JobID
+ */
+public class JobSubmissionResult {
+	private JobID jobID;
+
+	public JobSubmissionResult(JobID jobID) {
+		this.jobID = jobID;
+	}
+
+	/**
+	 * Returns the JobID assigned to the job by the Flink runtime.
+	 *
+	 * @return jobID, or null if the job has been executed on a runtime without JobIDs or if the execution failed.
+	 */
+	public JobID getJobID() {
+		return jobID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 78ad930..f605113 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -100,7 +100,7 @@ public class CollectionExecutor {
 		
 		long endTime = System.currentTimeMillis();
 		Map<String, Object> accumulatorResults = AccumulatorHelper.toResultMap(accumulators);
-		return new JobExecutionResult(endTime - startTime, accumulatorResults);
+		return new JobExecutionResult(null, endTime - startTime, accumulatorResults);
 	}
 	
 	private List<?> execute(Operator<?> operator) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
index 34ebbff..9c56e61 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
@@ -29,6 +29,7 @@ log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m
 # Log output from org.apache.flink.yarn to the console. This is used by the
 # CliFrontend class when using a per-job YARN cluster.
 log4j.logger.org.apache.flink.yarn=INFO, console
+log4j.logger.org.apache.flink.client.FlinkYarnSessionCli=INFO, console
 log4j.logger.org.apache.hadoop=INFO, console
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
index 07aa2ce..bfd5e85 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -54,7 +54,6 @@ public class WordCount {
 	
 	public static void main(String[] args) throws Exception {
 
-
 		if(!parseParameters(args)) {
 			return;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/pom.xml
----------------------------------------------------------------------
diff --git a/flink-optimizer/pom.xml b/flink-optimizer/pom.xml
index 55764e9..4d37aea 100644
--- a/flink-optimizer/pom.xml
+++ b/flink-optimizer/pom.xml
@@ -52,7 +52,7 @@ under the License.
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
@@ -60,4 +60,21 @@ under the License.
 		</dependency>
 	</dependencies>
 
+	<!-- Because flink-tests needs the CompilerTestBsae -->
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
index c80cfc2..c4d70f6 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.traversals.BinaryUnionReplacer;
 import org.apache.flink.optimizer.traversals.BranchesVisitor;
 import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
@@ -45,7 +46,6 @@ import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.postpass.OptimizerPostPass;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.util.InstantiationUtil;
 
 /**
@@ -303,8 +303,8 @@ public class Optimizer {
 	 * unknown sizes and hence use only the heuristic cost functions, which result in the selection
 	 * of the most robust execution strategies.
 	 */
-	public Optimizer() {
-		this(null, new DefaultCostEstimator());
+	public Optimizer(Configuration config) {
+		this(null, new DefaultCostEstimator(), config);
 	}
 
 	/**
@@ -314,8 +314,8 @@ public class Optimizer {
 	 * @param stats
 	 *        The statistics to be used to determine the input properties.
 	 */
-	public Optimizer(DataStatistics stats) {
-		this(stats, new DefaultCostEstimator());
+	public Optimizer(DataStatistics stats, Configuration config) {
+		this(stats, new DefaultCostEstimator(), config);
 	}
 
 	/**
@@ -328,8 +328,8 @@ public class Optimizer {
 	 * 
 	 * @param estimator The cost estimator to use to cost the individual operations.
 	 */
-	public Optimizer(CostEstimator estimator) {
-		this(null, estimator);
+	public Optimizer(CostEstimator estimator, Configuration config) {
+		this(null, estimator, config);
 	}
 
 	/**
@@ -343,17 +343,17 @@ public class Optimizer {
 	 * @param estimator
 	 *        The <tt>CostEstimator</tt> to use to cost the individual operations.
 	 */
-	public Optimizer(DataStatistics stats, CostEstimator estimator) {
+	public Optimizer(DataStatistics stats, CostEstimator estimator, Configuration config) {
 		this.statistics = stats;
 		this.costEstimator = estimator;
 
 		// determine the default parallelism
 		// check for old key string first, then for new one
-		this.defaultParallelism = GlobalConfiguration.getInteger(
+		this.defaultParallelism = config.getInteger(
 				ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
 				ConfigConstants.DEFAULT_PARALLELISM);
 		// now check for new one which overwrites old values
-		this.defaultParallelism = GlobalConfiguration.getInteger(
+		this.defaultParallelism = config.getInteger(
 				ConfigConstants.DEFAULT_PARALLELISM_KEY,
 				this.defaultParallelism);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
index 1e4bafb..0c50536 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.optimizer.util.DummyCrossStub;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 2df08a0..94ff41a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
 import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
index c7ad2da..57c53ff 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.optimizer;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
index 47efeb1..1a4cd18 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.optimizer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
index eba07f1..61d407a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
 
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
deleted file mode 100644
index 4eed236..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.Visitor;
-import org.junit.Before;
-
-/**
- * Base class for Optimizer tests. Offers utility methods to trigger optimization
- * of a program and to fetch the nodes in an optimizer plan that correspond
- * the the node in the program plan.
- */
-public abstract class CompilerTestBase implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-
-	protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random";
-	
-	protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/null";
-	
-	protected static final int DEFAULT_PARALLELISM = 8;
-	
-	private static final String CACHE_KEY = "cachekey";
-	
-	// ------------------------------------------------------------------------
-	
-	protected transient DataStatistics dataStats;
-	
-	protected transient Optimizer withStatsCompiler;
-	
-	protected transient Optimizer noStatsCompiler;
-	
-	private transient int statCounter;
-	
-	// ------------------------------------------------------------------------	
-	
-	@Before
-	public void setup() {
-		this.dataStats = new DataStatistics();
-		this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
-		this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
-		this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	public OptimizedPlan compileWithStats(Plan p) {
-		return this.withStatsCompiler.compile(p);
-	}
-	
-	public OptimizedPlan compileNoStats(Plan p) {
-		return this.noStatsCompiler.compile(p);
-	}
-	
-	public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) {
-		setSourceStatistics(source, new FileBaseStatistics(Long.MAX_VALUE, size, recordWidth));
-	}
-	
-	public void setSourceStatistics(GenericDataSourceBase<?, ?> source, FileBaseStatistics stats) {
-		final String key = CACHE_KEY + this.statCounter++;
-		this.dataStats.cacheBaseStatistics(stats, key);
-		source.setStatisticsKey(key);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) {
-		return new OptimizerPlanNodeResolver(plan);
-	}
-	
-	public static final class OptimizerPlanNodeResolver {
-		
-		private final Map<String, ArrayList<PlanNode>> map;
-		
-		OptimizerPlanNodeResolver(OptimizedPlan p) {
-			HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>();
-			
-			for (PlanNode n : p.getAllNodes()) {
-				Operator<?> c = n.getOriginalOptimizerNode().getOperator();
-				String name = c.getName();
-				
-				ArrayList<PlanNode> list = map.get(name);
-				if (list == null) {
-					list = new ArrayList<PlanNode>(2);
-					map.put(name, list);
-				}
-				
-				// check whether this node is a child of a node with the same contract (aka combiner)
-				boolean shouldAdd = true;
-				for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) {
-					PlanNode in = iter.next();
-					if (in.getOriginalOptimizerNode().getOperator() == c) {
-						// is this the child or is our node the child
-						if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) {
-							SingleInputPlanNode thisNode = (SingleInputPlanNode) n;
-							SingleInputPlanNode otherNode = (SingleInputPlanNode) in;
-							
-							if (thisNode.getPredecessor() == otherNode) {
-								// other node is child, remove it
-								iter.remove();
-							} else if (otherNode.getPredecessor() == thisNode) {
-								shouldAdd = false;
-							}
-						} else {
-							throw new RuntimeException("Unrecodnized case in test.");
-						}
-					}
-				}
-				
-				if (shouldAdd) {
-					list.add(n);
-				}
-			}
-			
-			this.map = map;
-		}
-		
-		
-		@SuppressWarnings("unchecked")
-		public <T extends PlanNode> T getNode(String name) {
-			List<PlanNode> nodes = this.map.get(name);
-			if (nodes == null || nodes.isEmpty()) {
-				throw new RuntimeException("No node found with the given name.");
-			} else if (nodes.size() != 1) {
-				throw new RuntimeException("Multiple nodes found with the given name.");
-			} else {
-				return (T) nodes.get(0);
-			}
-		}
-		
-		@SuppressWarnings("unchecked")
-		public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
-			List<PlanNode> nodes = this.map.get(name);
-			if (nodes == null || nodes.isEmpty()) {
-				throw new RuntimeException("No node found with the given name and stub class.");
-			} else {
-				PlanNode found = null;
-				for (PlanNode node : nodes) {
-					if (node.getClass() == stubClass) {
-						if (found == null) {
-							found = node;
-						} else {
-							throw new RuntimeException("Multiple nodes found with the given name and stub class.");
-						}
-					}
-				}
-				if (found == null) {
-					throw new RuntimeException("No node found with the given name and stub class.");
-				} else {
-					return (T) found;
-				}
-			}
-		}
-		
-		public List<PlanNode> getNodes(String name) {
-			List<PlanNode> nodes = this.map.get(name);
-			if (nodes == null || nodes.isEmpty()) {
-				throw new RuntimeException("No node found with the given name.");
-			} else {
-				return new ArrayList<PlanNode>(nodes);
-			}
-		}
-	}
-
-	/**
-	 * Collects all DataSources of a plan to add statistics
-	 *
-	 */
-	public static class SourceCollectorVisitor implements Visitor<Operator<?>> {
-		
-		protected final List<GenericDataSourceBase<?, ?>> sources = new ArrayList<GenericDataSourceBase<?, ?>>(4);
-
-		@Override
-		public boolean preVisit(Operator<?> visitable) {
-			
-			if(visitable instanceof GenericDataSourceBase) {
-				sources.add((GenericDataSourceBase<?, ?>) visitable);
-			}
-			else if(visitable instanceof BulkIterationBase) {
-				((BulkIterationBase<?>) visitable).getNextPartialSolution().accept(this);
-			}
-			
-			return true;
-		}
-
-		@Override
-		public void postVisit(Operator<?> visitable) {}
-		
-		public List<GenericDataSourceBase<?, ?>> getSources() {
-			return this.sources;
-		}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
index aaee975..bb3aa47 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 3b7eae7..7865861 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
 import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
index 77d185d..76b3b0e 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.optimizer.util.IdentityReduce;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
index 6dadc19..52e9a2d 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.optimizer.util.IdentityMap;
 import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.types.IntValue;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
index 810ec0e..0afbe93 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
index e65758f..34fc085 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings({"serial", "unchecked"})

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
index a54136a..8236f10 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.optimizer.util.DummyMatchStub;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.optimizer.util.IdentityMap;
 import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Assert;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.record.operators.FileDataSink;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
index 2b42f85..72effc1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
index 16684dc..f42eb02 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index 31f71d1..84f6377 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
 
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;