You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/09/22 21:57:17 UTC

[1/4] flink git commit: [FLINK-2097] temporarily disable session management API

Repository: flink
Updated Branches:
  refs/heads/master 7984acc6b -> d58caa8ec


[FLINK-2097] temporarily disable session management API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d58caa8e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d58caa8e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d58caa8e

Branch: refs/heads/master
Commit: d58caa8ec88c348bef540cb5959d80bcf9bd894a
Parents: 71bf2f5
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 15 15:21:20 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 22 19:55:46 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/java/ExecutionEnvironment.java  | 11 +++++++----
 .../java/org/apache/flink/api/java/LocalEnvironment.java |  6 ++++--
 .../org/apache/flink/api/java/RemoteEnvironment.java     |  6 ++++--
 3 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d58caa8e/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 23b5a57..0f61d88 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -277,10 +277,13 @@ public abstract class ExecutionEnvironment {
 	 * @param timeout The timeout, in seconds.
 	 */
 	public void setSessionTimeout(long timeout) {
-		if (timeout < 0) {
-			throw new IllegalArgumentException("The session timeout must not be less than zero.");
-		}
-		this.sessionTimeout = timeout;
+		throw new IllegalStateException("Support for sessions is currently disabled. " +
+				"It will be enabled in future Flink versions.");
+		// Session management is disabled, revert this commit to enable
+		//if (timeout < 0) {
+		//	throw new IllegalArgumentException("The session timeout must not be less than zero.");
+		//}
+		//this.sessionTimeout = timeout;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/d58caa8e/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 5fd272b..7c85ed9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -78,8 +78,10 @@ public class LocalEnvironment extends ExecutionEnvironment {
 		}
 
 		Plan p = createProgramPlan(jobName);
-		p.setJobId(jobID);
-		p.setSessionTimeout(sessionTimeout);
+
+		// Session management is disabled, revert this commit to enable
+		//p.setJobId(jobID);
+		//p.setSessionTimeout(sessionTimeout);
 
 		JobExecutionResult result = executor.executePlan(p);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d58caa8e/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 6ae1f26..63f59d3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -86,8 +86,10 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		ensureExecutorCreated();
 
 		Plan p = createProgramPlan(jobName);
-		p.setJobId(jobID);
-		p.setSessionTimeout(sessionTimeout);
+
+		// Session management is disabled, revert this commit to enable
+		//p.setJobId(jobID);
+		//p.setSessionTimeout(sessionTimeout);
 
 		JobExecutionResult result = executor.executePlan(p);
 


[3/4] flink git commit: [FLINK-2097][core] implement a job session management

Posted by mx...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
new file mode 100644
index 0000000..c5ced37
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+
+import java.util.List;
+
+/**
+ * Environment to extract the pre-optimized plan.
+ */
+public final class PreviewPlanEnvironment extends ExecutionEnvironment {
+
+	List<DataSinkNode> previewPlan;
+
+	String preview;
+
+	Plan plan;
+
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		this.plan = createProgramPlan(jobName);
+		this.previewPlan = Optimizer.createPreOptimizedPlan(plan);
+
+		// do not go on with anything now!
+		throw new OptimizerPlanEnvironment.ProgramAbortException();
+	}
+
+	@Override
+	public String getExecutionPlan() throws Exception {
+		Plan plan = createProgramPlan("unused");
+		this.previewPlan = Optimizer.createPreOptimizedPlan(plan);
+
+		// do not go on with anything now!
+		throw new OptimizerPlanEnvironment.ProgramAbortException();
+	}
+
+	@Override
+	public void startNewSession() throws Exception {
+	}
+
+	public void setAsContext() {
+		ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+			@Override
+			public ExecutionEnvironment createExecutionEnvironment() {
+				return PreviewPlanEnvironment.this;
+			}
+		};
+		initializeContextEnvironment(factory);
+	}
+
+	public void setPreview(String preview) {
+		this.preview = preview;
+	}
+
+	public Plan getPlan() {
+		return plan;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index e43d7cc..f83b97c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -1,4 +1,4 @@
-/*
+ /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -54,9 +54,6 @@ import org.slf4j.LoggerFactory;
 
 public class JobSubmissionServlet extends HttpServlet {
 
-	/**
-	 * Serial UID for serialization interoperability.
-	 */
 	private static final long serialVersionUID = 8447312301029847397L;
 
 	// ------------------------------------------------------------------------
@@ -96,7 +93,7 @@ public class JobSubmissionServlet extends HttpServlet {
 	private final Random rand;													// random number generator for UID
 
 	private final CliFrontend cli;
-
+	
 
 
 	public JobSubmissionServlet(CliFrontend cli, File jobDir, File planDir) {
@@ -296,10 +293,11 @@ public class JobSubmissionServlet extends HttpServlet {
 				return;
 			}
 
-			Long uid = null;
+			Long uid;
 			try {
 				uid = Long.parseLong(id);
-			} catch (NumberFormatException nfex) {
+			}
+			catch (NumberFormatException nfex) {
 				showErrorPage(resp, "An invalid id for the job was provided.");
 				return;
 			}
@@ -314,8 +312,8 @@ public class JobSubmissionServlet extends HttpServlet {
 
 			// submit the job
 			try {
-				Client client = new Client(GlobalConfiguration.getConfiguration(), job.f0.getUserCodeClassLoader());
-				client.run(client.getJobGraph(job.f0, job.f1), false);
+				Client client = new Client(GlobalConfiguration.getConfiguration());
+				client.runDetached(Client.getJobGraph(job.f0, job.f1), job.f0.getUserCodeClassLoader());
 			}
 			catch (Exception ex) {
 				LOG.error("Error submitting job to the job-manager.", ex);
@@ -329,7 +327,8 @@ public class JobSubmissionServlet extends HttpServlet {
 
 			// redirect to the start page
 			resp.sendRedirect(START_PAGE_URL);
-		} else if (action.equals(ACTION_BACK_VALUE)) {
+		}
+		else if (action.equals(ACTION_BACK_VALUE)) {
 			// remove the job from the map
 
 			String id = req.getParameter("id");
@@ -337,10 +336,11 @@ public class JobSubmissionServlet extends HttpServlet {
 				return;
 			}
 
-			Long uid = null;
+			Long uid;
 			try {
 				uid = Long.parseLong(id);
-			} catch (NumberFormatException nfex) {
+			}
+			catch (NumberFormatException nfex) {
 				showErrorPage(resp, "An invalid id for the job was provided.");
 				return;
 			}
@@ -350,9 +350,9 @@ public class JobSubmissionServlet extends HttpServlet {
 
 			// redirect to the start page
 			resp.sendRedirect(START_PAGE_URL);
-		} else {
+		}
+		else {
 			showErrorPage(resp, "Invalid action specified.");
-			return;
 		}
 	}
 
@@ -428,7 +428,7 @@ public class JobSubmissionServlet extends HttpServlet {
 	 *        The string to be split.
 	 * @return The array of split strings.
 	 */
-	private static final List<String> tokenizeArguments(String args) {
+	private static List<String> tokenizeArguments(String args) {
 		List<String> list = new ArrayList<String>();
 		StringBuilder curr = new StringBuilder();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
index 751783c..c1df541 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
@@ -18,28 +18,19 @@
 
 package org.apache.flink.client;
 
-import org.apache.flink.client.cli.CommandLineOptions;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.configuration.Configuration;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.net.InetAddress;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
 
 import static org.junit.Assert.*;
 
 public class CliFrontendInfoTest {
 	
-	@BeforeClass
-	public static void init() {
-		CliFrontendTestUtils.pipeSystemOutToNull();
-		CliFrontendTestUtils.clearGlobalConfiguration();
-	}
-	
+	private static PrintStream stdOut;
+	private static PrintStream capture;
+	private static ByteArrayOutputStream buffer;
+
 	@Test
 	public void testErrorCases() {
 		try {
@@ -67,71 +58,49 @@ public class CliFrontendInfoTest {
 	
 	@Test
 	public void testShowExecutionPlan() {
+		replaceStdOut();
 		try {
+
 			String[] parameters = new String[] { CliFrontendTestUtils.getTestJarPath() };
-			InfoTestCliFrontend testFrontend = new InfoTestCliFrontend(-1);
+			CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 			int retCode = testFrontend.info(parameters);
 			assertTrue(retCode == 0);
+			assertTrue(buffer.toString().contains("\"parallelism\": \"1\""));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail("Program caused an exception: " + e.getMessage());
+		} finally {
+			restoreStdOut();
 		}
 	}
 	
 	@Test
 	public void testShowExecutionPlanWithParallelism() {
+		replaceStdOut();
 		try {
 			String[] parameters = {"-p", "17", CliFrontendTestUtils.getTestJarPath()};
-			InfoTestCliFrontend testFrontend = new InfoTestCliFrontend(17);
+			CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 			int retCode = testFrontend.info(parameters);
 			assertTrue(retCode == 0);
+			assertTrue(buffer.toString().contains("\"parallelism\": \"17\""));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail("Program caused an exception: " + e.getMessage());
+		} finally {
+			restoreStdOut();
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private static final class InfoTestCliFrontend extends CliFrontend {
-		
-		private final int expectedDop;
-		
-		public InfoTestCliFrontend(int expectedDop) throws Exception {
-			super(CliFrontendTestUtils.getConfigDir());
-			this.expectedDop = expectedDop;
-		}
-
-		@Override
-		protected Client getClient(CommandLineOptions options, ClassLoader loader, String programName, int par)
-				throws Exception {
-			Configuration config = new Configuration();
 
-			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, InetAddress.getLocalHost().getHostName());
-			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6176);
-
-			return new TestClient(config, expectedDop);
-		}
+	private static void replaceStdOut() {
+		stdOut = System.out;
+		buffer = new ByteArrayOutputStream();
+		capture = new PrintStream(buffer);
+		System.setOut(capture);
 	}
-	
-	private static final class TestClient extends Client {
-		
-		private final int expectedDop;
-		
-		private TestClient(Configuration config, int expectedDop) throws Exception {
-			super(config, CliFrontendInfoTest.class.getClassLoader(), -1);
-			
-			this.expectedDop = expectedDop;
-		}
-		
-		@Override
-		public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism)
-				throws CompilerException, ProgramInvocationException
-		{
-			assertEquals(this.expectedDop, parallelism);
-			return "";
-		}
+
+	private static void restoreStdOut() {
+		System.setOut(stdOut);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index c9ce12b..1718ba5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -38,6 +38,9 @@ import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -294,11 +297,10 @@ public class CliFrontendPackageProgramTest {
 			assertArrayEquals(progArgs, prog.getArguments());
 
 			Configuration c = new Configuration();
-			c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
-			Client cli = new Client(c, getClass().getClassLoader());
-			
+			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), c);
+
 			// we expect this to fail with a "ClassNotFoundException"
-			cli.getOptimizedPlanAsJson(prog, 666);
+			Client.getOptimizedPlanAsJson(compiler, prog, 666);
 			fail("Should have failed with a ClassNotFoundException");
 		}
 		catch (ProgramInvocationException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index a7944ce..f910312 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -22,10 +22,12 @@ package org.apache.flink.client;
 import static org.apache.flink.client.CliFrontendTestUtils.*;
 import static org.junit.Assert.*;
 
+import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 
 public class CliFrontendRunTest {
@@ -44,16 +46,16 @@ public class CliFrontendRunTest {
 				String[] parameters = {"-v", "-l", "-a", "some", "program", "arguments"};
 				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 				int retCode = testFrontend.run(parameters);
-				assertTrue(retCode != 0);
+				assertNotEquals(0, retCode);
 			}
-			
+
 			// test without parallelism
 			{
 				String[] parameters = {"-v", getTestJarPath()};
 				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true);
 				assertEquals(0, testFrontend.run(parameters));
 			}
-			
+
 			// test configure parallelism
 			{
 				String[] parameters = {"-v", "-p", "42",  getTestJarPath()};
@@ -72,14 +74,14 @@ public class CliFrontendRunTest {
 			{
 				String[] parameters = {"-v", "-p", "text",  getTestJarPath()};
 				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
-				assertTrue(0 != testFrontend.run(parameters));
+				assertNotEquals(0, testFrontend.run(parameters));
 			}
-			
+
 			// test configure parallelism with overflow integer value
 			{
 				String[] parameters = {"-v", "-p", "475871387138",  getTestJarPath()};
 				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
-				assertTrue(0 != testFrontend.run(parameters));
+				assertNotEquals(0, testFrontend.run(parameters));
 			}
 		}
 		catch (Exception e) {
@@ -87,7 +89,7 @@ public class CliFrontendRunTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	
 	public static final class RunTestingCliFrontend extends CliFrontend {
@@ -102,10 +104,20 @@ public class CliFrontendRunTest {
 		}
 
 		@Override
-		protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
+		protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
 			assertEquals(this.expectedParallelism, parallelism);
-			assertEquals(client.getPrintStatusDuringExecution(), sysoutLogging);
+			assertEquals(this.sysoutLogging, client.getPrintStatusDuringExecution());
+			return 0;
+		}
+
+		@Override
+		protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
 			return 0;
 		}
+
+		@Override
+		protected Client getClient(CommandLineOptions options, String programName, int userParallelism) throws Exception {
+			return Mockito.mock(Client.class);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index 7f67567..be3fe89 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
@@ -50,7 +50,7 @@ public class RemoteExecutorHostnameResolutionTest {
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}
-		catch (ProgramInvocationException e) {
+		catch (IOException e) {
 			// that is what we want!
 			assertTrue(e.getCause() instanceof UnknownHostException);
 		}
@@ -72,7 +72,7 @@ public class RemoteExecutorHostnameResolutionTest {
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}
-		catch (ProgramInvocationException e) {
+		catch (IOException e) {
 			// that is what we want!
 			assertTrue(e.getCause() instanceof UnknownHostException);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 1b9fd73..ef2161e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.program;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.net.NetUtils;
 import org.junit.Test;
@@ -80,7 +79,7 @@ public class ClientConnectionTest {
 		testFailureBehavior(unreachableEndpoint);
 	}
 
-	private void testFailureBehavior(InetSocketAddress unreachableEndpoint) {
+	private void testFailureBehavior(final InetSocketAddress unreachableEndpoint) {
 
 		final Configuration config = new Configuration();
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, (ASK_STARTUP_TIMEOUT/1000) + " s");
@@ -93,16 +92,13 @@ public class ClientConnectionTest {
 			JobVertex vertex = new JobVertex("Test Vertex");
 			vertex.setInvokableClass(TestInvokable.class);
 
-			final JobGraph jg = new JobGraph("Test Job", vertex);
-			final Client client = new Client(config, getClass().getClassLoader(), -1);
-
 			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 
 			Thread invoker = new Thread("test invoker") {
 				@Override
 				public void run() {
 					try {
-						client.run(jg, true);
+						new Client(config);
 						fail("This should fail with an exception since the JobManager is unreachable.");
 					}
 					catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index bc898b3..621ef63 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -22,20 +22,25 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
 
+
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -45,78 +50,56 @@ import org.apache.flink.runtime.util.SerializedThrowable;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
+
 
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
-import scala.Some;
-import scala.Tuple2;
+import java.util.Collections;
+
 
 import java.util.UUID;
 
 import static org.junit.Assert.*;
-import static org.mockito.Matchers.any;
 
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 /**
  * Simple and maybe stupid test to check the {@link Client} class.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Client.class)
 public class ClientTest {
 
 	private PackagedProgram program;
-	private Optimizer compilerMock;
-	private JobGraphGenerator generatorMock;
-
 
 	private Configuration config;
 
 	private ActorSystem jobManagerSystem;
 
-	private JobGraph jobGraph = new JobGraph("test graph");
 
 	@Before
 	public void setUp() throws Exception {
 
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
+		
+		Plan plan = env.createProgramPlan();
+		JobWithJars jobWithJars = new JobWithJars(plan, Collections.<String>emptyList());
+
+		program = mock(PackagedProgram.class);
+		when(program.getPlanWithJars()).thenReturn(jobWithJars);
+		
 		final int freePort = NetUtils.getAvailablePort();
 		config = new Configuration();
 		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, freePort);
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
 
-		program = mock(PackagedProgram.class);
-		compilerMock = mock(Optimizer.class);
-		generatorMock = mock(JobGraphGenerator.class);
-
-		JobWithJars planWithJarsMock = mock(JobWithJars.class);
-		Plan planMock = mock(Plan.class);
-		OptimizedPlan optimizedPlanMock = mock(OptimizedPlan.class);
-
-		when(planMock.getJobName()).thenReturn("MockPlan");
-
-		when(program.getPlanWithJars()).thenReturn(planWithJarsMock);
-		when(planWithJarsMock.getPlan()).thenReturn(planMock);
-
-		whenNew(Optimizer.class).withArguments(any(DataStatistics.class), any(CostEstimator.class), any(Configuration.class)).thenReturn(this.compilerMock);
-		when(compilerMock.compile(planMock)).thenReturn(optimizedPlanMock);
-
-		whenNew(JobGraphGenerator.class).withAnyArguments().thenReturn(generatorMock);
-		when(generatorMock.compileJobGraph(optimizedPlanMock)).thenReturn(jobGraph);
-
 		try {
-			Tuple2<String, Object> address = new Tuple2<String, Object>("localhost", freePort);
-			jobManagerSystem = AkkaUtils.createActorSystem(config, new Some<Tuple2<String, Object>>(address));
+			scala.Tuple2<String, Object> address = new scala.Tuple2<String, Object>("localhost", freePort);
+			jobManagerSystem = AkkaUtils.createActorSystem(config, new scala.Some<scala.Tuple2<String, Object>>(address));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -145,15 +128,12 @@ public class ClientTest {
 		try {
 			jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
 
-			Client out = new Client(config, getClass().getClassLoader());
-			JobSubmissionResult result = out.run(program.getPlanWithJars(), -1, false);
+			Client out = new Client(config);
+			JobSubmissionResult result = out.runDetached(program.getPlanWithJars(), 1);
 
 			assertNotNull(result);
 
 			program.deleteExtractedLibraries();
-
-			verify(this.compilerMock, times(1)).compile(any(Plan.class));
-			verify(this.generatorMock, times(1)).compileJobGraph(any(OptimizedPlan.class));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -169,10 +149,10 @@ public class ClientTest {
 		try {
 			jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME());
 
-			Client out = new Client(config, getClass().getClassLoader());
+			Client out = new Client(config);
 
 			try {
-				out.run(program.getPlanWithJars(), -1, false);
+				out.runDetached(program.getPlanWithJars(), 1);
 				fail("This should fail with an exception");
 			}
 			catch (ProgramInvocationException e) {
@@ -181,9 +161,6 @@ public class ClientTest {
 			catch (Exception e) {
 				fail("wrong exception " + e);
 			}
-
-			verify(this.compilerMock, times(1)).compile(any(Plan.class));
-			verify(this.generatorMock, times(1)).compileJobGraph(any(OptimizedPlan.class));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -198,10 +175,10 @@ public class ClientTest {
 	@Test
 	public void tryLocalExecution() {
 		try {
+			jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
+			
 			PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
-
 			when(packagedProgramMock.isUsingInteractiveMode()).thenReturn(true);
-
 			doAnswer(new Answer<Void>() {
 				@Override
 				public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -211,7 +188,7 @@ public class ClientTest {
 			}).when(packagedProgramMock).invokeInteractiveModeForExecution();
 
 			try {
-				new Client(config, getClass().getClassLoader()).run(packagedProgramMock, 1, true);
+				new Client(config).runBlocking(packagedProgramMock, 1);
 				fail("Creating the local execution environment should not be possible");
 			}
 			catch (InvalidProgramException e) {
@@ -224,6 +201,34 @@ public class ClientTest {
 		}
 	}
 
+	@Test
+	public void testGetExecutionPlan() {
+		try {
+			jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME());
+			
+			PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
+			assertNotNull(prg.getPreviewPlan());
+			
+			Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
+			OptimizedPlan op = (OptimizedPlan) Client.getOptimizedPlan(optimizer, prg, 1);
+			assertNotNull(op);
+
+			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
+			assertNotNull(dumper.getOptimizerPlanAsJSON(op));
+
+			// test HTML escaping
+			PlanJSONDumpGenerator dumper2 = new PlanJSONDumpGenerator();
+			dumper2.setEncodeForHTML(true);
+			String htmlEscaped = dumper2.getOptimizerPlanAsJSON(op);
+
+			assertEquals(-1, htmlEscaped.indexOf('\\'));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	public static class SuccessReturningActor extends FlinkUntypedActor {
@@ -273,4 +278,33 @@ public class ClientTest {
 			return leaderSessionID;
 		}
 	}
+	
+	public static class TestOptimizerPlan implements ProgramDescription {
+
+		@SuppressWarnings("serial")
+		public static void main(String[] args) throws Exception {
+			if (args.length < 2) {
+				System.err.println("Usage: TestOptimizerPlan <input-file-path> <output-file-path>");
+				return;
+			}
+
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			DataSet<Tuple2<Long, Long>> input = env.readCsvFile(args[0])
+					.fieldDelimiter("\t").types(Long.class, Long.class);
+
+			DataSet<Tuple2<Long, Long>> result = input.map(
+					new MapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>>() {
+						public Tuple2<Long, Long> map(Tuple2<Long, Long> value){
+							return new Tuple2<Long, Long>(value.f0, value.f1+1);
+						}
+					});
+			result.writeAsCsv(args[1], "\n", "\t");
+			env.execute();
+		}
+		@Override
+		public String getDescription() {
+			return "TestOptimizerPlan <input-file-path> <output-file-path>";
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
index f156f77..116c1e6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.client.CliFrontendTestUtils;
-import org.junit.BeforeClass;
+
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -32,14 +31,10 @@ import static org.junit.Assert.*;
 @SuppressWarnings("serial")
 public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
 
-	@BeforeClass
-	public static void suppressOutput() {
-		CliFrontendTestUtils.pipeSystemOutToNull();
-	}
-
 	@Test
 	public void testExecuteAfterGetExecutionPlan() {
-		ExecutionEnvironment env = new LocalEnvironment();
+		ExecutionEnvironment env = new LocalEnvironment(); 
+		env.getConfig().disableSysoutLogging();
 		
 		DataSet<Integer> baseSet = env.fromElements(1, 2);
 
@@ -51,7 +46,9 @@ public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
 		try {
 			env.getExecutionPlan();
 			env.execute();
-		} catch (Exception e) {
+		}
+		catch (Exception e) {
+			e.printStackTrace();
 			fail("Cannot run both #getExecutionPlan and #execute.");
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index d1e971f..be2caaf 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -29,6 +29,9 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.configuration.Configuration;
@@ -49,9 +52,9 @@ public class ExecutionPlanCreationTest {
 
 			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, mockJmAddress.getHostName());
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, mockJmAddress.getPort());
-			
-			Client client = new Client(config, getClass().getClassLoader(), -1);
-			OptimizedPlan op = (OptimizedPlan) client.getOptimizedPlan(prg, -1);
+
+			Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
+			OptimizedPlan op = (OptimizedPlan) Client.getOptimizedPlan(optimizer, prg, -1);
 			assertNotNull(op);
 			
 			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
index 1a9f17f..95506f4 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.PrintStream;
 
 import org.apache.flink.client.CliFrontendTestUtils;
-import org.apache.flink.client.program.PackagedProgram;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
index 99e4906..7078e90 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -31,6 +31,7 @@ import backtype.storm.generated.NotAliveException;
 import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
+import com.google.common.collect.Lists;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
@@ -55,7 +56,6 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -174,26 +174,26 @@ public class FlinkClient {
 			throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
 		}
 
-		final List<File> jarFiles = new ArrayList<File>();
-		jarFiles.add(uploadedJarFile);
-
 		final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
 		jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
 
 		final Configuration configuration = jobGraph.getJobConfiguration();
 
-		final Client client;
-
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
 
-		client = new Client(
-			configuration,
-			JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()),
-			-1);
+		final Client client;
+		try {
+			client = new Client(configuration);
+		} catch (IOException e) {
+			throw new RuntimeException("Could not establish a connection to the job manager", e);
+		}
 
 		try {
-			client.run(jobGraph, false);
+			ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
+					Lists.newArrayList(uploadedJarFile),
+					this.getClass().getClassLoader());
+			client.runDetached(jobGraph, classLoader);
 		} catch (final ProgramInvocationException e) {
 			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index bf06c75..92d2b98 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -29,7 +30,7 @@ public class JobExecutionResult extends JobSubmissionResult {
 
 	private long netRuntime;
 
-	private Map<String, Object> accumulatorResults;
+	private Map<String, Object> accumulatorResults = Collections.emptyMap();
 
 	/**
 	 * Creates a new JobExecutionResult.
@@ -41,7 +42,10 @@ public class JobExecutionResult extends JobSubmissionResult {
 	public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulators) {
 		super(jobID);
 		this.netRuntime = netRuntime;
-		this.accumulatorResults = accumulators;
+
+		if (accumulators != null) {
+			this.accumulatorResults = accumulators;
+		}
 	}
 
 	/**
@@ -106,4 +110,13 @@ public class JobExecutionResult extends JobSubmissionResult {
 		}
 		return (Integer) result;
 	}
+
+	/**
+	 * Returns a dummy object for wrapping a JobSubmissionResult
+	 * @param result The SubmissionResult
+	 * @return a JobExecutionResult
+	 */
+	public static JobExecutionResult fromJobSubmissionResult(JobSubmissionResult result) {
+		return new JobExecutionResult(result.getJobID(), -1, null);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobID.java b/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
index 7478da4..13a1a32 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
@@ -15,35 +15,75 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common;
 
-import javax.xml.bind.DatatypeConverter;
 import org.apache.flink.util.AbstractID;
+import javax.xml.bind.DatatypeConverter;
 import java.nio.ByteBuffer;
 
 /**
- * Unique Job Identifier
+ * Unique (at least statistically unique) identifier for a Flink Job. Jobs in Flink correspond
+ * do dataflow graphs.
+ * 
+ * <p>Jobs act simultaneously as <i>sessions</i>, because jobs can be created and submitted
+ * incrementally in different parts. Newer fragments of a graph can be attached to existing
+ * graphs, thereby extending the current data flow graphs.</p>
  */
 public final class JobID extends AbstractID {
 
 	private static final long serialVersionUID = 1L;
-	
+
+	/**
+	 * Creates a new (statistically) random JobID.
+	 */
 	public JobID() {
 		super();
 	}
 
+	/**
+	 * Creates a new JobID, using the given lower and upper parts.
+	 * 
+	 * @param lowerPart The lower 8 bytes of the ID. 
+	 * @param upperPart The upper 8 bytes of the ID.
+	 */
 	public JobID(long lowerPart, long upperPart) {
 		super(lowerPart, upperPart);
 	}
 
+	/**
+	 * Creates a new JobID from the given byte sequence. The byte sequence must be
+	 * exactly 16 bytes long. The first eight bytes make up the lower part of the ID,
+	 * while the next 8 bytes make up the upper part of the ID.
+	 *
+	 * @param bytes The byte sequence.
+	 */
 	public JobID(byte[] bytes) {
 		super(bytes);
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Static factory methods
+	// ------------------------------------------------------------------------
 
+	/**
+	 * Creates a new (statistically) random JobID.
+	 * 
+	 * @return A new random JobID.
+	 */
 	public static JobID generate() {
 		return new JobID();
 	}
 
+	/**
+	 * Creates a new JobID from the given byte sequence. The byte sequence must be
+	 * exactly 16 bytes long. The first eight bytes make up the lower part of the ID,
+	 * while the next 8 bytes make up the upper part of the ID.
+	 * 
+	 * @param bytes The byte sequence.
+	 *                 
+	 * @return A new JobID corresponding to the ID encoded in the bytes.
+	 */
 	public static JobID fromByteArray(byte[] bytes) {
 		return new JobID(bytes);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
index 5cea9d5..3a18eb4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
@@ -15,13 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common;
 
 /**
- * The result of a job submission.
- * Contains the JobID
+ * The result of submitting a job to a JobManager.
  */
 public class JobSubmissionResult {
+	
 	private JobID jobID;
 
 	public JobSubmissionResult(JobID jobID) {

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index e07ea45..e0d1eb8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -43,12 +43,14 @@ import org.apache.flink.util.Visitable;
 import org.apache.flink.util.Visitor;
 
 /**
- * This class encapsulates a single job (an instantiated data flow), together with some parameters.
- * Parameters include the name and a default parallelism. The job is referenced by the data sinks,
- * from which a traversal reaches all connected nodes of the job.
+ * This class represents Flink programs, in the form of dataflow plans.
+ *
+ * <p>The dataflow is referenced by the data sinks, from which all connected
+ * operators of the data flow can be reached via backwards traversal</p>.
  */
 public class Plan implements Visitable<Operator<?>> {
 
+	/** The default parallelism indicates to use the cluster's default */
 	private static final int DEFAULT_PARALELLISM = -1;
 	
 	/**
@@ -57,34 +59,31 @@ public class Plan implements Visitable<Operator<?>> {
 	 */
 	protected final List<GenericDataSinkBase<?>> sinks = new ArrayList<GenericDataSinkBase<?>>(4);
 
-	/**
-	 * The name of the job.
-	 */
+	/** The name of the job. */
 	protected String jobName;
 
-	/**
-	 * The default parallelism to use for nodes that have no explicitly specified parallelism.
-	 */
+	/** The default parallelism to use for nodes that have no explicitly specified parallelism. */
 	protected int defaultParallelism = DEFAULT_PARALELLISM;
 	
-	/**
-	 * Hash map for files in the distributed cache: registered name to cache entry.
-	 */
+	/** Hash map for files in the distributed cache: registered name to cache entry. */
 	protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap<String, DistributedCacheEntry>();
+	
+	/** Config object for runtime execution parameters. */
+	protected ExecutionConfig executionConfig;
 
-	/**
-	 * Config object for runtime execution parameters.
-	 */
-	protected ExecutionConfig executionConfig = null;
+	/** The ID of the Job that this dataflow plan belongs to */
+	private JobID jobId;
+	
+	private long sessionTimeout;
 
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new program plan with the given name, describing the data flow that ends at the
+	 * Creates a new dataflow plan with the given name, describing the data flow that ends at the
 	 * given data sinks.
-	 * <p>
-	 * If not all of the sinks of a data flow are given to the plan, the flow might
-	 * not be translated entirely. 
+	 * 
+	 * <p>If not all of the sinks of a data flow are given to the plan, the flow might
+	 * not be translated entirely.</p>
 	 *  
 	 * @param sinks The collection will the sinks of the job's data flow.
 	 * @param jobName The name to display for the job.
@@ -238,7 +237,37 @@ public class Plan implements Visitable<Operator<?>> {
 		checkNotNull(jobName, "The job name must not be null.");
 		this.jobName = jobName;
 	}
-	
+
+	/**
+	 * Gets the ID of the job that the dataflow plan belongs to.
+	 * If this ID is not set, then the dataflow represents its own
+	 * independent job.
+	 * 
+	 * @return The ID of the job that the dataflow plan belongs to.
+	 */
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	/**
+	 * Sets the ID of the job that the dataflow plan belongs to.
+	 * If this ID is set to {@code null}, then the dataflow represents its own
+	 * independent job.
+	 * 
+	 * @param jobId The ID of the job that the dataflow plan belongs to.
+	 */
+	public void setJobId(JobID jobId) {
+		this.jobId = jobId;
+	}
+
+	public void setSessionTimeout(long sessionTimeout) {
+		this.sessionTimeout = sessionTimeout;
+	}
+
+	public long getSessionTimeout() {
+		return sessionTimeout;
+	}
+
 	/**
 	 * Gets the default parallelism for this job. That degree is always used when an operator
 	 * is not explicitly given a parallelism.

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
index 1294011..514692f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common;
 
 import org.apache.flink.configuration.Configuration;
@@ -26,14 +25,20 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * A PlanExecutor runs a plan. The specific implementation (such as the org.apache.flink.client.LocalExecutor
- * and org.apache.flink.client.RemoteExecutor) determines where and how to run the plan.
+ * A PlanExecutor executes a Flink program's dataflow plan. All Flink programs are translated to
+ * dataflow plans prior to execution.
+ * 
+ * <p>The specific implementation (such as the org.apache.flink.client.LocalExecutor
+ * and org.apache.flink.client.RemoteExecutor) determines where and how to run the dataflow.
+ * The concrete implementations of the executors are loaded dynamically, because they depend on
+ * the full set of all runtime classes.</p>
  * 
- * The concrete implementations are loaded dynamically, because they depend on the full set of
- * dependencies of all runtime classes.
+ * <p>PlanExecutors can be started explicitly, in which case they keep running until stopped. If
+ * a program is submitted to a plan executor that is not running, it will start up for that
+ * program, and shut down afterwards.</p>
  */
 public abstract class PlanExecutor {
-	
+
 	private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";
 	private static final String REMOTE_EXECUTOR_CLASS = "org.apache.flink.client.RemoteExecutor";
 
@@ -43,21 +48,68 @@ public abstract class PlanExecutor {
 	
 	/** If true, all execution progress updates are not only logged, but also printed to System.out */
 	private boolean printUpdatesToSysout = true;
-	
+
+	/**
+	 * Sets whether the executor should print progress results to "standard out" ({@link System#out}).
+	 * All progress messages are logged using the configured logging framework independent of the value
+	 * set here.
+	 * 
+	 * @param printStatus True, to print progress updates to standard out, false to not do that. 
+	 */
 	public void setPrintStatusDuringExecution(boolean printStatus) {
 		this.printUpdatesToSysout = printStatus;
 	}
-	
+
+	/**
+	 * Gets whether the executor prints progress results to "standard out" ({@link System#out}).
+	 * 
+	 * @return True, if the executor prints progress messages to standard out, false if not.
+	 */
 	public boolean isPrintingStatusDuringExecution() {
 		return this.printUpdatesToSysout;
 	}
+
+	// ------------------------------------------------------------------------
+	//  Startup & Shutdown
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Starts the program executor. After the executor has been started, it will keep
+	 * running until {@link #stop()} is called. 
+	 * 
+	 * @throws Exception Thrown, if the executor startup failed.
+	 */
+	public abstract void start() throws Exception;
+
+	/**
+	 * Shuts down the plan executor and releases all local resources.
+	 *
+	 * <p>This method also ends all sessions created by this executor. Remote job executions
+	 * may complete, but the session is not kept alive after that.</p>
+	 *
+	 * @throws Exception Thrown, if the proper shutdown failed. 
+	 */
+	public abstract void stop() throws Exception;
+
+	/**
+	 * Checks if this executor is currently running.
+	 * 
+	 * @return True is the executor is running, false otherwise.
+	 */
+	public abstract boolean isRunning();
 	
 	// ------------------------------------------------------------------------
 	//  Program Execution
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * Execute the given plan and return the runtime in milliseconds.
+	 * Execute the given program.
+	 * 
+	 * <p>If the executor has not been started before, then this method will start the
+	 * executor and stop it after the execution has completed. This implies that one needs
+	 * to explicitly start the executor for all programs where multiple dataflow parts
+	 * depend on each other. Otherwise, the previous parts will no longer
+	 * be available, because the executor immediately shut down after the execution.</p>
 	 * 
 	 * @param plan The plan of the program to execute.
 	 * @return The execution result, containing for example the net runtime of the program, and the accumulators.
@@ -66,7 +118,6 @@ public abstract class PlanExecutor {
 	 */
 	public abstract JobExecutionResult executePlan(Plan plan) throws Exception;
 	
-	
 	/**
 	 * Gets the programs execution plan in a JSON format.
 	 * 
@@ -77,7 +128,17 @@ public abstract class PlanExecutor {
 	 */
 	public abstract String getOptimizerPlanAsJSON(Plan plan) throws Exception;
 
-
+	/**
+	 * Ends the job session, identified by the given JobID. Jobs can be kept around as sessions,
+	 * if a session timeout is specified. Keeping Jobs as sessions allows users to incrementally
+	 * add new operations to their dataflow, that refer to previous intermediate results of the
+	 * dataflow.
+	 * 
+	 * @param jobID The JobID identifying the job session.
+	 * @throws Exception Thrown, if the message to finish the session cannot be delivered.
+	 */
+	public abstract void endSession(JobID jobID) throws Exception;
+	
 	// ------------------------------------------------------------------------
 	//  Executor Factories
 	// ------------------------------------------------------------------------
@@ -102,7 +163,7 @@ public abstract class PlanExecutor {
 	/**
 	 * Creates an executor that runs the plan on a remote environment. The remote executor is typically used
 	 * to send the program to a cluster for execution.
-	 * 
+	 *
 	 * @param hostname The address of the JobManager to send the program to.
 	 * @param port The port of the JobManager to send the program to.
 	 * @param clientConfiguration The configuration for the client (Akka, default.parallelism).

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
index 51e91d7..dbb7cc0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
@@ -52,4 +52,8 @@ public class CollectionEnvironment extends ExecutionEnvironment {
 	public String getExecutionPlan() throws Exception {
 		throw new UnsupportedOperationException("Execution plans are not used for collection-based execution.");
 	}
+
+	@Override
+	public void startNewSession() throws Exception {
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 64b3a1d..23b5a57 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -26,11 +26,11 @@ import java.util.Calendar;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.UUID;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.io.FileInputFormat;
@@ -94,7 +94,8 @@ import com.google.common.base.Preconditions;
  */
 public abstract class ExecutionEnvironment {
 
-	private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
+	/** The logger used by the environment and its subclasses */
+	protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
 	
 	/** The environment of the context (local by default, cluster if invoked through command line) */
 	private static ExecutionEnvironmentFactory contextEnvironmentFactory;
@@ -106,34 +107,42 @@ public abstract class ExecutionEnvironment {
 	private static boolean allowLocalExecution = true;
 	
 	// --------------------------------------------------------------------------------------------
-	
-	private final UUID executionId;
-	
+
 	private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
 	
 	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
 
 	private final ExecutionConfig config = new ExecutionConfig();
 
-	/** Result from the latest execution, to be make it retrievable when using eager execution methods */
+	/** Result from the latest execution, to make it retrievable when using eager execution methods */
 	protected JobExecutionResult lastJobExecutionResult;
+
+	/** The ID of the session, defined by this execution environment. Sessions and Jobs are same in
+	 *  Flink, as Jobs can consist of multiple parts that are attached to the growing dataflow graph */
+	protected JobID jobID;
+	
+	/** The session timeout in seconds */
+	protected long sessionTimeout;
 	
 	/** Flag to indicate whether sinks have been cleared in previous executions */
 	private boolean wasExecuted = false;
-
-	// --------------------------------------------------------------------------------------------
-	//  Constructor and Properties
-	// --------------------------------------------------------------------------------------------
+	
 	
 	/**
 	 * Creates a new Execution Environment.
 	 */
 	protected ExecutionEnvironment() {
-		this.executionId = UUID.randomUUID();
+		jobID = JobID.generate();
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Properties
+	// --------------------------------------------------------------------------------------------
+	
 	/**
-	 * Gets the config object.
+	 * Gets the config object that defines execution parameters.
+	 * 
+	 * @return The environment's execution configuration.
 	 */
 	public ExecutionConfig getConfig() {
 		return config;
@@ -228,41 +237,72 @@ public abstract class ExecutionEnvironment {
 	}
 	
 	/**
-	 * Gets the UUID by which this environment is identified. The UUID sets the execution context
+	 * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
+	 * 
+	 * @return The execution result from the latest job execution.
+	 */
+	public JobExecutionResult getLastJobExecutionResult(){
+		return this.lastJobExecutionResult;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Session Management
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Gets the JobID by which this environment is identified. The JobID sets the execution context
 	 * in the cluster or local environment.
 	 *
-	 * @return The UUID of this environment.
+	 * @return The JobID of this environment.
 	 * @see #getIdString()
 	 */
-	public UUID getId() {
-		return this.executionId;
+	public JobID getId() {
+		return this.jobID;
 	}
 
 	/**
-	 * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
-	 * 
-	 * @return The execution result from the latest job execution.
+	 * Gets the JobID by which this environment is identified, as a string.
+	 *
+	 * @return The JobID as a string.
+	 * @see #getId()
 	 */
-	public JobExecutionResult getLastJobExecutionResult(){
-		return this.lastJobExecutionResult;
+	public String getIdString() {
+		return this.jobID.toString();
 	}
 
+	/**
+	 * Sets the session timeout to hold the intermediate results of a job. This only
+	 * applies the updated timeout in future executions.
+	 * 
+	 * @param timeout The timeout, in seconds.
+	 */
+	public void setSessionTimeout(long timeout) {
+		if (timeout < 0) {
+			throw new IllegalArgumentException("The session timeout must not be less than zero.");
+		}
+		this.sessionTimeout = timeout;
+	}
 
 	/**
-	 * Gets the UUID by which this environment is identified, as a string.
+	 * Gets the session timeout for this environment. The session timeout defines for how long
+	 * after an execution, the job and its intermediate results will be kept for future
+	 * interactions.
 	 * 
-	 * @return The UUID as a string.
-	 * @see #getId()
+	 * @return The session timeout, in seconds.
 	 */
-	public String getIdString() {
-		return this.executionId.toString();
+	public long getSessionTimeout() {
+		return sessionTimeout;
 	}
 
+	/**
+	 * Starts a new session, discarding the previous data flow and all of its intermediate results.
+	 */
+	public abstract void startNewSession() throws Exception;
+
 	// --------------------------------------------------------------------------------------------
 	//  Registry for types and serializers
 	// --------------------------------------------------------------------------------------------
 
-
 	/**
 	 * Adds a new Kryo default serializer to the Runtime.
 	 *
@@ -944,7 +984,7 @@ public abstract class ExecutionEnvironment {
 				}
 				if(typeInfo instanceof CompositeType) {
 					List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<GenericTypeInfo<?>>();
-					Utils.getContainedGenericTypes((CompositeType)typeInfo, genericTypesInComposite);
+					Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite);
 					for(GenericTypeInfo<?> gt : genericTypesInComposite) {
 						Serializers.recursivelyRegisterType(gt.getTypeClass(), config);
 					}
@@ -1176,4 +1216,5 @@ public abstract class ExecutionEnvironment {
 	public static boolean localExecutionIsAllowed() {
 		return allowLocalExecution;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 27b6254..5fd272b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -20,49 +20,113 @@ package org.apache.flink.api.java;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.configuration.Configuration;
 
 /**
  * An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the JVM where the
- * environment is instantiated. When this environment is instantiated, it uses a default parallelism
- * of {@code 1}. Local environments can also be instantiated through
- * {@link ExecutionEnvironment#createLocalEnvironment()} and {@link ExecutionEnvironment#createLocalEnvironment(int)}.
- * The former version will pick a default parallelism equal to the number of hardware contexts in the local
- * machine.
+ * environment is instantiated.
+ * 
+ * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. Teh default
+ * parallelism can be set via {@link #setParallelism(int)}.</p>
+ * 
+ * <p>Local environments can also be instantiated through {@link ExecutionEnvironment#createLocalEnvironment()}
+ * and {@link ExecutionEnvironment#createLocalEnvironment(int)}. The former version will pick a
+ * default parallelism equal to the number of hardware contexts in the local machine.</p>
  */
 public class LocalEnvironment extends ExecutionEnvironment {
+	
+	/** The user-defined configuration for the local execution */
 	private Configuration configuration;
+
+	/** Create lazily upon first use */
+	private PlanExecutor executor;
+
+	/** In case we keep the executor alive for sessions, this reaper shuts it down eventually.
+	 * The reaper's finalize method triggers the executor shutdown. */
+	@SuppressWarnings("all")
+	private ExecutorReaper executorReaper;
+	
 	/**
 	 * Creates a new local environment.
 	 */
 	public LocalEnvironment() {
-		if(!ExecutionEnvironment.localExecutionIsAllowed()) {
+		if (!ExecutionEnvironment.localExecutionIsAllowed()) {
 			throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting a program through a client.");
 		}
+		this.configuration = new Configuration();
+	}
+
+	/**
+	 * Sets a configuration used to configure the local Flink executor.
+	 * If {@code null} is passed, then the default configuration will be used.
+	 * 
+	 * @param customConfiguration The configuration to be used for the local execution.
+	 */
+	public void setConfiguration(Configuration customConfiguration) {
+		this.configuration = customConfiguration != null ? customConfiguration : new Configuration();
 	}
 	
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
+		if (executor == null) {
+			startNewSession();
+		}
+
 		Plan p = createProgramPlan(jobName);
-		
-		PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
-		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
-		this.lastJobExecutionResult = executor.executePlan(p);
-		return this.lastJobExecutionResult;
+		p.setJobId(jobID);
+		p.setSessionTimeout(sessionTimeout);
+
+		JobExecutionResult result = executor.executePlan(p);
+
+		this.lastJobExecutionResult = result;
+		return result;
 	}
 	
 	@Override
 	public String getExecutionPlan() throws Exception {
 		Plan p = createProgramPlan(null, false);
 		
-		PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
-		return executor.getOptimizerPlanAsJSON(p);
+		// make sure that we do not start an executor in any case here.
+		// if one runs, fine, of not, we only create the class but disregard immediately afterwards
+		if (executor != null) {
+			return executor.getOptimizerPlanAsJSON(p);
+		}
+		else {
+			PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(configuration);
+			return tempExecutor.getOptimizerPlanAsJSON(p);
+		}
 	}
-	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void startNewSession() throws Exception {
+		if (executor != null) {
+			// we need to end the previous session
+			executor.stop();
+			// create also a new JobID
+			jobID = JobID.generate();
+		}
+		
+		// create a new local executor
+		executor = PlanExecutor.createLocalExecutor(configuration);
+		executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+		
+		// if we have a session, start the mini cluster eagerly to have it available across sessions
+		if (getSessionTimeout() > 0) {
+			executor.start();
+			
+			// also install the reaper that will shut it down eventually
+			executorReaper = new ExecutorReaper(executor);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
 	
 	@Override
 	public String toString() {
@@ -70,7 +134,91 @@ public class LocalEnvironment extends ExecutionEnvironment {
 				+ ") : " + getIdString();
 	}
 
-	public void setConfiguration(Configuration customConfiguration) {
-		this.configuration = customConfiguration;
+	// ------------------------------------------------------------------------
+	//  Reaping the local executor when in session mode
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This thread shuts down the local executor.
+	 *
+	 * <p><b>IMPORTANT:</b> This must be a static inner class to hold no reference to the outer class.
+	 * Otherwise, the outer class could never become garbage collectible while this thread runs.</p>
+	 */
+	private static class ShutdownThread extends Thread {
+
+		private final Object monitor = new Object();
+
+		private final PlanExecutor executor;
+
+		private volatile boolean running = true;
+		private volatile boolean triggered = false;
+
+		ShutdownThread(PlanExecutor executor) {
+			super("Local cluster reaper");
+			setDaemon(true);
+			setPriority(Thread.MIN_PRIORITY);
+
+			this.executor = executor;
+		}
+
+		@Override
+		public void run() {
+			synchronized (monitor) {
+				while (running && !triggered) {
+					try {
+						monitor.wait();
+					}
+					catch (InterruptedException e) {
+						// should never happen
+					}
+				}
+			}
+
+			if (running && triggered) {
+				try {
+					executor.stop();
+				}
+				catch (Throwable t) {
+					System.err.println("Cluster reaper caught exception during shutdown");
+					t.printStackTrace();
+				}
+			}
+		}
+
+		void trigger() {
+			triggered = true;
+			synchronized (monitor) {
+				monitor.notifyAll();
+			}
+		}
+
+		void cancel() {
+			running = false;
+			synchronized (monitor) {
+				monitor.notifyAll();
+			}
+		}
+	}
+
+	/**
+	 * A class that, upon finalization, shuts down the local mini cluster by triggering the reaper
+	 * thread.
+	 */
+	private static class ExecutorReaper {
+
+		private final ShutdownThread shutdownThread;
+
+		ExecutorReaper(PlanExecutor executor) {
+			this.shutdownThread = new ShutdownThread(executor);
+			this.shutdownThread.start();
+		}
+
+		@Override
+		protected void finalize() throws Throwable {
+			super.finalize();
+
+			shutdownThread.trigger();
+			shutdownThread.cancel();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 1f73e93..6ae1f26 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -19,30 +19,47 @@
 package org.apache.flink.api.java;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.configuration.Configuration;
 
 /**
- * An {@link ExecutionEnvironment} that sends programs 
- * to a cluster for execution. Note that all file paths used in the program must be accessible from the
- * cluster. The execution will use the cluster's default parallelism, unless the parallelism is
- * set explicitly via {@link ExecutionEnvironment#setParallelism(int)}.
+ * An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment
+ * needs to be created with the address and port of the JobManager of the Flink cluster that
+ * should execute the programs.
+ * 
+ * <p>Many programs executed via the remote environment depend on additional classes. Such classes
+ * may be the classes of functions (transformation, aggregation, ...) or libraries. Those classes
+ * must be attached to the remote environment as JAR files, to allow the environment to ship the
+ * classes into the cluster for the distributed execution.</p>
  */
 public class RemoteEnvironment extends ExecutionEnvironment {
 	
+	/** The hostname of the JobManager */
 	protected final String host;
-	
+
+	/** The port of the JobManager main actor system */
 	protected final int port;
-	
+
+	/** The jar files that need to be attached to each job */
 	private final String[] jarFiles;
 
+	/** The remote executor lazily created upon first use */
+	private PlanExecutor executor;
+	
 	private Configuration clientConfiguration;
 	
+	
+	/** Optional shutdown hook, used in session mode to eagerly terminate the last session */
+	private Thread shutdownHook;
+
 	/**
 	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
 	 * given host name and port.
 	 * 
+	 * <p>Each program execution will have all the given JAR files in its classpath.</p>
+	 * 
 	 * @param host The host name or address of the master (JobManager), where the program should be executed.
 	 * @param port The port of the master (JobManager), where the program should be executed. 
 	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
@@ -53,45 +70,137 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		if (host == null) {
 			throw new NullPointerException("Host must not be null.");
 		}
-		
 		if (port < 1 || port >= 0xffff) {
 			throw new IllegalArgumentException("Port out of range");
 		}
-		
+
 		this.host = host;
 		this.port = port;
 		this.jarFiles = jarFiles;
 	}
-	
-	
+
+	// ------------------------------------------------------------------------
+
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
+		ensureExecutorCreated();
+
 		Plan p = createProgramPlan(jobName);
-		
-		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
-		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
+		p.setJobId(jobID);
+		p.setSessionTimeout(sessionTimeout);
 
-		this.lastJobExecutionResult = executor.executePlan(p);
-		return this.lastJobExecutionResult;
+		JobExecutionResult result = executor.executePlan(p);
+
+		this.lastJobExecutionResult = result;
+		return result;
 	}
-	
+
 	@Override
 	public String getExecutionPlan() throws Exception {
-		Plan p = createProgramPlan("unnamed", false);
-		p.setDefaultParallelism(getParallelism());
-		registerCachedFilesWithPlan(p);
+		Plan p = createProgramPlan("plan", false);
+
+		// make sure that we do not start an new executor here
+		// if one runs, fine, of not, we create a local executor (lightweight) and let it
+		// generate the plan
+		if (executor != null) {
+			return executor.getOptimizerPlanAsJSON(p);
+		}
+		else {
+			PlanExecutor le = PlanExecutor.createLocalExecutor(null);
+			return le.getOptimizerPlanAsJSON(p);
+		}
+	}
+
+	@Override
+	public void startNewSession() throws Exception {
+		dispose();
+		jobID = JobID.generate();
+		installShutdownHook();
+	}
+	
+	private void ensureExecutorCreated() throws Exception {
+		if (executor == null) {
+			executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
+			executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
+		}
 		
-		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration, jarFiles);
-		return executor.getOptimizerPlanAsJSON(p);
+		// if we are using sessions, we keep the executor running
+		if (getSessionTimeout() > 0 && !executor.isRunning()) {
+			executor.start();
+			installShutdownHook();
+		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Dispose
+	// ------------------------------------------------------------------------
+
+	protected void dispose() {
+		// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
+		// shutdown hook itself
+		if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+			try {
+				Runtime.getRuntime().removeShutdownHook(shutdownHook);
+			}
+			catch (IllegalStateException e) {
+				// race, JVM is in shutdown already, we can safely ignore this
+			}
+			catch (Throwable t) {
+				LOG.warn("Exception while unregistering the cleanup shutdown hook.");
+			}
+		}
+		
+		try {
+			PlanExecutor executor = this.executor;
+			if (executor != null) {
+				executor.endSession(jobID);
+				executor.stop();
+			}
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Failed to dispose the session shutdown hook.");
+		}
+	}
+	
 	@Override
 	public String toString() {
 		return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
 				(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
 	}
-
+	
 	public void setClientConfiguration(Configuration clientConfiguration) {
 		this.clientConfiguration = clientConfiguration;
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Shutdown hooks and reapers
+	// ------------------------------------------------------------------------
+
+	private void installShutdownHook() {
+		if (shutdownHook == null) {
+			Thread shutdownHook = new Thread(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						dispose();
+					}
+					catch (Throwable t) {
+						LOG.error("Error in cleanup of RemoteEnvironment during JVM shutdown: " + t.getMessage(), t);
+					}
+				}
+			});
+	
+			try {
+				// Add JVM shutdown hook to call shutdown of service
+				Runtime.getRuntime().addShutdownHook(shutdownHook);
+				this.shutdownHook = shutdownHook;
+			}
+			catch (IllegalStateException e) {
+				// JVM is already shutting down. no need or a shutdown hook
+			}
+			catch (Throwable t) {
+				LOG.error("Cannot register shutdown hook that cleanly terminates the BLOB service.");
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
index d56be87..311c286 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
@@ -27,9 +27,10 @@ import org.apache.flink.util.Visitor;
 /**
  * The execution plan generated by the Optimizer. It contains {@link PlanNode}s
  * and {@link Channel}s that describe exactly how the program should be executed.
- * It defines all ship strategies (local pipe, shuffle, broadcast, rebalance), all
- * operator strategies (sorting-merge join, hash join, sorted grouping, ...),
- * and the data exchange modes (batched, pipelined).
+ * 
+ * <p>The optimized plan defines all ship strategies (local pipe, shuffle, broadcast, rebalance),
+ * all operator strategies (sorting-merge join, hash join, sorted grouping, ...),
+ * and the data exchange modes (batched, pipelined).</p>
  */
 public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode>  {
 	
@@ -42,7 +43,7 @@ public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode>  {
 	/** All nodes in the optimizer plan. */
 	private final Collection<PlanNode> allNodes;
 	
-	/** The original program. */
+	/** The original program (as a dataflow plan). */
 	private final Plan originalProgram;
 
 	/** Name of the job */
@@ -104,11 +105,11 @@ public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode>  {
 	}
 	
 	/**
-	 * Gets the original program plan from which this optimized plan was created.
+	 * Gets the original program's dataflow plan from which this optimized plan was created.
 	 * 
-	 * @return The original program plan.
+	 * @return The original program's dataflow plan.
 	 */
-	public Plan getOriginalPactPlan() {
+	public Plan getOriginalPlan() {
 		return this.originalProgram;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 0cbcea8..3567fad 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer.plantranslate;
 import com.fasterxml.jackson.core.JsonFactory;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
@@ -159,9 +160,21 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	 * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
 	 * 
 	 * @param program Optimized plan that is translated into a JobGraph.
-	 * @return JobGraph generated frmo the plan.
+	 * @return JobGraph generated from the plan.
 	 */
 	public JobGraph compileJobGraph(OptimizedPlan program) {
+		return compileJobGraph(program, null);
+	}
+	
+	public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {
+		if (program == null) {
+			throw new NullPointerException();
+		}
+		
+		if (jobId == null) {
+			jobId = JobID.generate();
+		}
+
 		this.vertices = new HashMap<PlanNode, JobVertex>();
 		this.chainedTasks = new HashMap<PlanNode, TaskInChain>();
 		this.chainedTasksInSequence = new ArrayList<TaskInChain>();
@@ -204,9 +217,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		// ----------- finalize the job graph -----------
 		
 		// create the job graph object
-		JobGraph graph = new JobGraph(program.getJobName());
-		graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
+		JobGraph graph = new JobGraph(jobId, program.getJobName());
+		graph.setNumberOfExecutionRetries(program.getOriginalPlan().getNumberOfExecutionRetries());
 		graph.setAllowQueuedScheduling(false);
+		graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
 
 		// add vertices to the graph
 		for (JobVertex vertex : this.vertices.values()) {
@@ -219,13 +233,13 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		}
 
 		// add registered cache file into job configuration
-		for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles()) {
+		for (Entry<String, DistributedCacheEntry> e : program.getOriginalPlan().getCachedFiles()) {
 			DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration());
 		}
 
 		try {
 			InstantiationUtil.writeObjectToConfig(
-					program.getOriginalPactPlan().getExecutionConfig(),
+					program.getOriginalPlan().getExecutionConfig(),
 					graph.getJobConfiguration(),
 					ExecutionConfig.CONFIG_KEY);
 		} catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
index a685ff4..3180ab4 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
@@ -75,7 +75,7 @@ public class JavaApiPostPass implements OptimizerPostPass {
 	@Override
 	public void postPass(OptimizedPlan plan) {
 
-		executionConfig = plan.getOriginalPactPlan().getExecutionConfig();
+		executionConfig = plan.getOriginalPlan().getExecutionConfig();
 
 		for (SinkPlanNode sink : plan.getDataSinks()) {
 			traverse(sink);

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 9d64866..c51bc7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -122,7 +122,6 @@ public class JobClient {
 	 * @param jobGraph    JobGraph describing the Flink job
 	 * @param timeout     Timeout for futures
 	 * @param sysoutLogUpdates prints log updates to system out if true
-	 * @param userCodeClassloader class loader to be used for deserialization
 	 * @return The job execution result
 	 * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
 	 *                                                               execution fails.
@@ -133,7 +132,7 @@ public class JobClient {
 			JobGraph jobGraph,
 			FiniteDuration timeout,
 			boolean sysoutLogUpdates,
-			ClassLoader userCodeClassloader) throws JobExecutionException {
+			ClassLoader classLoader) throws JobExecutionException {
 
 		checkNotNull(actorSystem, "The actorSystem must not be null.");
 		checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
@@ -182,7 +181,7 @@ public class JobClient {
 			SerializedJobExecutionResult result = ((JobManagerMessages.JobResultSuccess) answer).result();
 			if (result != null) {
 				try {
-					return result.toJobExecutionResult(userCodeClassloader);
+					return result.toJobExecutionResult(classLoader);
 				}
 				catch (Throwable t) {
 					throw new JobExecutionException(jobGraph.getJobID(),
@@ -199,7 +198,7 @@ public class JobClient {
 
 			SerializedThrowable serThrowable = ((JobManagerMessages.JobResultFailure) answer).cause();
 			if (serThrowable != null) {
-				Throwable cause = serThrowable.deserializeError(userCodeClassloader);
+				Throwable cause = serThrowable.deserializeError(classLoader);
 				if (cause instanceof JobExecutionException) {
 					throw (JobExecutionException) cause;
 				}
@@ -230,7 +229,7 @@ public class JobClient {
 			ActorGateway jobManagerGateway,
 			JobGraph jobGraph,
 			FiniteDuration timeout,
-			ClassLoader userCodeClassloader) throws JobExecutionException {
+			ClassLoader classLoader) throws JobExecutionException {
 
 		checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null.");
 		checkNotNull(jobGraph, "The jobGraph must not be null.");
@@ -269,7 +268,7 @@ public class JobClient {
 		else if (result instanceof JobManagerMessages.JobResultFailure) {
 			try {
 				SerializedThrowable t = ((JobManagerMessages.JobResultFailure) result).cause();
-				throw t.deserializeError(userCodeClassloader);
+				throw t.deserializeError(classLoader);
 			}
 			catch (JobExecutionException e) {
 				throw e;

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
index 7871a8c..7c6a4af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 
 /**
  * This exception is the base exception for all exceptions that denote any failure during
- * teh execution of a job. The JobExecutionException and its subclasses are thrown by
+ * the execution of a job. The JobExecutionException and its subclasses are thrown by
  * the {@link JobClient}.
  */
 public class JobExecutionException extends Exception {


[2/4] flink git commit: [FLINK-2097][core] implement a job session management

Posted by mx...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 70a49fd..889ce43 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -178,6 +178,9 @@ public class ExecutionGraph implements Serializable {
 
 	/** Flag that indicate whether the executed dataflow should be periodically snapshotted */
 	private boolean snapshotCheckpointsEnabled;
+
+	/** Flag to indicate whether the Graph has been archived */
+	private boolean isArchived = false;
 		
 
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
@@ -326,6 +329,9 @@ public class ExecutionGraph implements Serializable {
 		return scheduleMode;
 	}
 
+	public boolean isArchived() {
+		return isArchived;
+	}
 	public void enableSnapshotCheckpointing(
 			long interval,
 			long checkpointTimeout,
@@ -779,6 +785,8 @@ public class ExecutionGraph implements Serializable {
 		requiredJarFiles.clear();
 		jobStatusListenerActors.clear();
 		executionListenerActors.clear();
+
+		isArchived = true;
 	}
 
 	public ExecutionConfig getExecutionConfig() {

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 7677a1b..e4a0209 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -70,15 +70,19 @@ public class JobGraph implements Serializable {
 
 	/** Set of blob keys identifying the JAR files required to run this job. */
 	private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
-	
-	/** ID of this job. */
+
+	/** ID of this job. May be set if specific job id is desired (e.g. session management) */
 	private final JobID jobID;
 
 	/** Name of this job. */
-	private String jobName;
+	private final String jobName;
 	
 	/** The number of times that failed tasks should be re-executed */
 	private int numExecutionRetries;
+
+	/** The number of seconds after which the corresponding ExecutionGraph is removed at the
+	 * job manager after it has been executed. */
+	private long sessionTimeout = 0;
 	
 	/** flag to enable queued scheduling */
 	private boolean allowQueuedScheduling;
@@ -86,7 +90,7 @@ public class JobGraph implements Serializable {
 	/** The mode in which the job is scheduled */
 	private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 	
-	/** The settings for asynchronous snapshotting */
+	/** The settings for asynchronous snapshots */
 	private JobSnapshottingSettings snapshotSettings;
 
 	// --------------------------------------------------------------------------------------------
@@ -99,19 +103,19 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
-	 * Constructs a new job graph with the given name and a random job ID.
+	 * Constructs a new job graph with the given name, a random job ID.
 	 * 
 	 * @param jobName The name of the job
 	 */
 	public JobGraph(String jobName) {
 		this(null, jobName);
 	}
-	
+
 	/**
-	 * Constructs a new job graph with the given name and a random job ID.
+	 * Constructs a new job graph with the given name and a random job ID if null supplied as an id.
 	 * 
-	 * @param jobId The id of the job
-	 * @param jobName The name of the job
+	 * @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
+	 * @param jobName The name of the job.
 	 */
 	public JobGraph(JobID jobId, String jobName) {
 		this.jobID = jobId == null ? new JobID() : jobId;
@@ -119,7 +123,7 @@ public class JobGraph implements Serializable {
 	}
 	
 	/**
-	 * Constructs a new job graph with no name and a random job ID.
+	 * Constructs a new job graph with no name and a random job ID if null supplied as an id.
 	 * 
 	 * @param vertices The vertices to add to the graph.
 	 */
@@ -138,9 +142,9 @@ public class JobGraph implements Serializable {
 	}
 	
 	/**
-	 * Constructs a new job graph with the given name and a random job ID.
+	 * Constructs a new job graph with the given name and a random job ID if null supplied as an id.
 	 * 
-	 * @param jobId The id of the job.
+	 * @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
 	 * @param jobName The name of the job.
 	 * @param vertices The vertices to add to the graph.
 	 */
@@ -162,7 +166,7 @@ public class JobGraph implements Serializable {
 	public JobID getJobID() {
 		return this.jobID;
 	}
-	
+
 	/**
 	 * Returns the name assigned to the job graph.
 	 * 
@@ -173,9 +177,10 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
-	 * Returns the configuration object for this job if it is set.
+	 * Returns the configuration object for this job. Job-wide parameters should be set into that
+	 * configuration object.
 	 * 
-	 * @return the configuration object for this job, or <code>null</code> if it is not set
+	 * @return The configuration object for this job.
 	 */
 	public Configuration getJobConfiguration() {
 		return this.jobConfiguration;
@@ -190,7 +195,8 @@ public class JobGraph implements Serializable {
 	 */
 	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
 		if (numberOfExecutionRetries < -1) {
-			throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+			throw new IllegalArgumentException(
+					"The number of execution retries must be non-negative, or -1 (use system default)");
 		}
 		this.numExecutionRetries = numberOfExecutionRetries;
 	}
@@ -205,11 +211,29 @@ public class JobGraph implements Serializable {
 	public int getNumberOfExecutionRetries() {
 		return numExecutionRetries;
 	}
+
+	/**
+	 * Gets the timeout after which the corresponding ExecutionGraph is removed at the
+	 * job manager after it has been executed.
+	 * @return a timeout as a long in seconds.
+	 */
+	public long getSessionTimeout() {
+		return sessionTimeout;
+	}
+
+	/**
+	 * Sets the timeout of the session in seconds. The timeout specifies how long a job will be kept
+	 * in the job manager after it finishes.
+	 * @param sessionTimeout The timeout in seconds
+	 */
+	public void setSessionTimeout(long sessionTimeout) {
+		this.sessionTimeout = sessionTimeout;
+	}
 	
 	public void setAllowQueuedScheduling(boolean allowQueuedScheduling) {
 		this.allowQueuedScheduling = allowQueuedScheduling;
 	}
-	
+
 	public boolean getAllowQueuedScheduling() {
 		return allowQueuedScheduling;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index c7ea06e..60aadf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -187,7 +187,7 @@ public class TaskExecutionState implements java.io.Serializable {
 	
 	@Override
 	public String toString() {
-		return String.format("TaskState jobId=%s, executionId=%s, state=%s, error=%s", 
+		return String.format("TaskState jobId=%s, jobID=%s, state=%s, error=%s",
 				jobID, executionId, executionState,
 				throwable == null ? "(null)" : throwable.toString());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 26d7272..75ad20f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor.ActorRef
 
+
 /**
  * Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor
  * submitted the job, when the start time and, if already terminated, the end time was.
@@ -29,7 +30,15 @@ import akka.actor.ActorRef
  * @param client Actor which submitted the job
  * @param start Starting time
  */
-class JobInfo(val client: ActorRef, val start: Long){
+class JobInfo(val client: ActorRef, val start: Long,
+              val sessionTimeout: Long) {
+
+  var sessionAlive = sessionTimeout > 0
+
+  var lastActive = 0L
+
+  setLastActive()
+
   var end: Long = -1
 
   def duration: Long = {
@@ -39,8 +48,13 @@ class JobInfo(val client: ActorRef, val start: Long){
       -1
     }
   }
+
+  def setLastActive() =
+    lastActive = System.currentTimeMillis()
 }
 
 object JobInfo{
-  def apply(client: ActorRef, start: Long) = new JobInfo(client, start)
+  def apply(client: ActorRef, start: Long,
+            sessionTimeout: Long) =
+    new JobInfo(client, start, sessionTimeout)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d93b2ed..444ab0b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -74,6 +74,8 @@ import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+
 
 /**
  * The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the
@@ -121,7 +123,7 @@ class JobManager(
 
   override val log = Logger(getClass)
 
-  /** List of current jobs running jobs */
+  /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
 
   var leaderSessionID: Option[UUID] = None
@@ -426,7 +428,19 @@ class JobManager(
               }
             }
 
-            removeJob(jobID)
+
+            if (jobInfo.sessionAlive) {
+              jobInfo.setLastActive()
+              val lastActivity = jobInfo.lastActive
+              context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+                // remove only if no activity occurred in the meantime
+                if (lastActivity == jobInfo.lastActive) {
+                  removeJob(jobID)
+                }
+              }
+            } else {
+              removeJob(jobID)
+            }
 
           }
         case None =>
@@ -555,6 +569,18 @@ class JobManager(
     case RequestJobManagerStatus =>
       sender() ! decorateMessage(JobManagerStatusAlive)
 
+    case RemoveCachedJob(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((graph, info)) =>
+          if (graph.getState.isTerminalState) {
+            removeJob(graph.getJobID)
+          } else {
+            // triggers removal upon completion of job
+            info.sessionAlive = false
+          }
+        case None =>
+      }
+
     case Disconnect(msg) =>
       val taskManager = sender()
 
@@ -624,19 +650,26 @@ class JobManager(
         }
 
         // see if there already exists an ExecutionGraph for the corresponding job ID
-        executionGraph = currentJobs.getOrElseUpdate(
-          jobGraph.getJobID,
-          (new ExecutionGraph(
-            executionContext,
-            jobGraph.getJobID,
-            jobGraph.getName,
-            jobGraph.getJobConfiguration(),
-            timeout,
-            jobGraph.getUserJarBlobKeys(),
-            userCodeLoader),
-            JobInfo(client, System.currentTimeMillis())
-          )
-        )._1
+        executionGraph = currentJobs.get(jobGraph.getJobID) match {
+          case Some((graph, jobInfo)) =>
+            jobInfo.setLastActive()
+            graph
+          case None =>
+            val graph = new ExecutionGraph(
+              executionContext,
+              jobGraph.getJobID,
+              jobGraph.getName,
+              jobGraph.getJobConfiguration,
+              timeout,
+              jobGraph.getUserJarBlobKeys,
+              userCodeLoader)
+            val jobInfo = JobInfo(
+              client,
+              System.currentTimeMillis(),
+              jobGraph.getSessionTimeout)
+            currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
+            graph
+        }
 
         // configure the execution graph
         val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries() >= 0) {
@@ -990,25 +1023,26 @@ class JobManager(
    * @param jobID ID of the job to remove and archive
    */
   private def removeJob(jobID: JobID): Unit = {
-    currentJobs.remove(jobID) match {
-      case Some((eg, _)) =>
-        try {
-          eg.prepareForArchiving()
-
-          archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
-        } catch {
-          case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
-            "archiving.", t)
-        }
+    currentJobs.synchronized {
+      currentJobs.remove(jobID) match {
+        case Some((eg, _)) =>
+          try {
+            eg.prepareForArchiving()
+            archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
+          } catch {
+            case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
+              "archiving.", t)
+          }
 
-      case None =>
-    }
+        case None =>
+      }
 
-    try {
-      libraryCacheManager.unregisterJob(jobID)
-    } catch {
-      case t: Throwable =>
-        log.error(s"Could not properly unregister job $jobID form the library cache.", t)
+      try {
+        libraryCacheManager.unregisterJob(jobID)
+      } catch {
+        case t: Throwable =>
+          log.error(s"Could not properly unregister job $jobID form the library cache.", t)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index d3fc8b1..bef52e0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -80,10 +80,13 @@ class MemoryArchivist(private val max_entries: Int)
   override def handleMessage: Receive = {
     
     /* Receive Execution Graph to archive */
-    case ArchiveExecutionGraph(jobID, graph) => 
-      // wrap graph inside a soft reference
-      graphs.update(jobID, graph)
-
+    case ArchiveExecutionGraph(jobID, graph) =>
+      // Keep lru order in case we override a graph (from multiple job submission in one session).
+      // This deletes old ExecutionGraph with this JobID from the history but avoids to store
+      // redundant ExecutionGraphs.
+      // TODO Allow ExecutionGraphs with the same jobID to be stored and displayed in web interface
+      graphs.remove(jobID)
+      graphs.put(jobID, graph)
       // update job counters
       graph.getState match {
         case JobStatus.FINISHED => finishedCnt += 1

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index d7bbb8d..3869392 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -275,6 +275,12 @@ object JobManagerMessages {
   case class JobNotFound(jobID: JobID) extends JobResponse with JobStatusResponse
 
   /**
+   * Removes the job belonging to the job identifier from the job manager and archives it.
+   * @param jobID The job identifier
+   */
+  case class RemoveCachedJob(jobID: JobID)
+
+  /**
    * Requests the instances of all registered task managers.
    */
   case object RequestRegisteredTaskManagers

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index bbd011a..839193b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -416,8 +416,8 @@ abstract class FlinkMiniCluster(
        jobManagerGateway,
        jobGraph,
        timeout,
-      printUpdates,
-      this.getClass().getClassLoader())
+       printUpdates,
+       this.getClass.getClassLoader())
     } finally {
        if(!useSingleActorSystem) {
          // we have to shutdown the just created actor system
@@ -440,7 +440,10 @@ abstract class FlinkMiniCluster(
         )
     }
 
-    JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, getClass().getClassLoader())
+    JobClient.submitJobDetached(jobManagerGateway,
+      jobGraph,
+      timeout,
+      this.getClass.getClassLoader())
 
     new JobSubmissionResult(jobGraph.getJobID)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index fdbffaa..5753cde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -107,7 +107,8 @@ public class PartialConsumePipelinedResultTest {
 				flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
 				jobGraph,
 				TestingUtils.TESTING_DURATION(),
-				false, this.getClass().getClassLoader());
+				false,
+				this.getClass().getClassLoader());
 	}
 
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 5594bfe..075c1c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -52,7 +52,7 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandP
 import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
 
 /**
- * Tests that the JobManager process properly exits when the JobManager actor dies.
+ * Tests that the TaskManager process properly exits when the TaskManager actor dies.
  */
 public class TaskManagerProcessReapingTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 74b7680..3a252f8 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager
 
+
 import Tasks._
 import akka.actor.ActorSystem
 import akka.actor.Status.{Success, Failure}
@@ -27,13 +28,13 @@ import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph, ScheduleMode}
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.runtime.util.SerializedThrowable
+import org.apache.flink.runtime.testingUtils.{TestingUtils, ScalaTestingUtils}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
+
 import org.junit.runner.RunWith
+import org.scalatest.{Matchers, BeforeAndAfterAll, WordSpecLike}
 import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
@@ -617,6 +618,121 @@ class JobManagerITCase(_system: ActorSystem)
         cluster.stop()
       }
     }
+
+    "remove execution graphs when the client ends the session explicitly" in {
+      val vertex = new JobVertex("Test Vertex")
+      vertex.setInvokableClass(classOf[NoOpInvokable])
+
+      val jobGraph1 = new JobGraph("Test Job", vertex)
+
+      val slowVertex = new WaitingOnFinalizeJobVertex("Long running Vertex", 2000)
+      slowVertex.setInvokableClass(classOf[NoOpInvokable])
+
+      val jobGraph2 = new JobGraph("Long running Job", slowVertex)
+
+      val cluster = TestingUtils.startTestingCluster(1)
+      val jm = cluster.getLeaderGateway(1 seconds)
+
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          /* jobgraph1 is removed after being terminated */
+          jobGraph1.setSessionTimeout(9999)
+          jm.tell(SubmitJob(jobGraph1, ListeningBehaviour.EXECUTION_RESULT), self)
+          expectMsg(JobSubmitSuccess(jobGraph1.getJobID))
+          expectMsgType[JobResultSuccess]
+
+          // should not be archived yet
+          jm.tell(RequestExecutionGraph(jobGraph1.getJobID), self)
+          var cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+          assert(!cachedGraph.isArchived)
+
+          jm.tell(RemoveCachedJob(jobGraph1.getJobID), self)
+
+          jm.tell(RequestExecutionGraph(jobGraph1.getJobID), self)
+          cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+          assert(cachedGraph.isArchived)
+
+          /* jobgraph2 is removed while running */
+          jobGraph2.setSessionTimeout(9999)
+          jm.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self)
+          expectMsg(JobSubmitSuccess(jobGraph2.getJobID))
+
+          // job stil running
+          jm.tell(RemoveCachedJob(jobGraph2.getJobID), self)
+
+          expectMsgType[JobResultSuccess]
+
+          // should be archived!
+          jm.tell(RequestExecutionGraph(jobGraph2.getJobID), self)
+          cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+          assert(cachedGraph.isArchived)
+        }
+      } finally {
+        cluster.stop()
+      }
+    }
+
+    "remove execution graphs when when the client's session times out" in {
+      val vertex = new JobVertex("Test Vertex")
+      vertex.setParallelism(1)
+      vertex.setInvokableClass(classOf[NoOpInvokable])
+
+      val jobGraph = new JobGraph("Test Job", vertex)
+
+      val cluster = TestingUtils.startTestingCluster(1)
+      val jm = cluster.getLeaderGateway(1 seconds)
+
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          // try multiple times in case of flaky environments
+          var testSucceeded = false
+          var numTries = 0
+          while(!testSucceeded && numTries < 10) {
+            try {
+              // should be removed immediately
+              jobGraph.setSessionTimeout(0)
+              jm.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
+              expectMsg(JobSubmitSuccess(jobGraph.getJobID))
+              expectMsgType[JobResultSuccess]
+
+              jm.tell(RequestExecutionGraph(jobGraph.getJobID), self)
+              val cachedGraph2 = expectMsgType[ExecutionGraphFound].executionGraph
+              assert(cachedGraph2.isArchived)
+
+              // removed after 2 seconds
+              jobGraph.setSessionTimeout(2)
+
+              jm.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
+              expectMsg(JobSubmitSuccess(jobGraph.getJobID))
+              expectMsgType[JobResultSuccess]
+
+              // should not be archived yet
+              jm.tell(RequestExecutionGraph(jobGraph.getJobID), self)
+              val cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+              assert(!cachedGraph.isArchived)
+
+              // wait until graph is archived
+              Thread.sleep(3000)
+
+              jm.tell(RequestExecutionGraph(jobGraph.getJobID), self)
+              val graph = expectMsgType[ExecutionGraphFound].executionGraph
+              assert(graph.isArchived)
+
+              testSucceeded = true
+            } catch {
+              case e: Throwable =>
+                numTries += 1
+            }
+          }
+          if(!testSucceeded) {
+            fail("Test case failed after " + numTries + " probes.")
+          }
+        }
+      } finally {
+        cluster.stop()
+      }
+    }
+
   }
 
   class WaitingOnFinalizeJobVertex(name: String, val waitingTime: Long) extends JobVertex(name){

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 5265134..745b0d3 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -22,7 +22,7 @@ import java.util.UUID
 import com.esotericsoftware.kryo.Serializer
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.{ExecutionConfig, JobExecutionResult}
+import org.apache.flink.api.common.{JobID, ExecutionConfig, JobExecutionResult}
 import org.apache.flink.api.java.io._
 import org.apache.flink.api.java.operators.DataSource
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
@@ -131,7 +131,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Gets the UUID by which this environment is identified. The UUID sets the execution context
    * in the cluster or local environment.
    */
-  def getId: UUID = {
+  def getId: JobID = {
     javaEnv.getId
   }
 
@@ -148,6 +148,33 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Starts a new session, discarding all intermediate results.
+   */
+  def startNewSession() {
+    javaEnv.startNewSession()
+  }
+
+  /**
+   * Sets the session timeout to hold the intermediate results of a job. This only
+   * applies the updated timeout in future executions.
+   * @param timeout The timeout in seconds.
+   */
+  def setSessionTimeout(timeout: Long) {
+    javaEnv.setSessionTimeout(timeout)
+  }
+
+  /**
+   * Gets the session timeout for this environment. The session timeout defines for how long
+   * after an execution, the job and its intermediate results will be kept for future
+   * interactions.
+   *
+   * @return The session timeout, in seconds.
+   */
+  def getSessionTimeout: Long = {
+    javaEnv.getSessionTimeout
+  }
+
+  /**
    * Registers the given type with the serializer at the [[KryoSerializer]].
    *
    * Note that the serializer instance must be serializable (as defined by java.io.Serializable),

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index f691806..e2d91af 100644
--- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -19,12 +19,21 @@
 package org.apache.flink.api.avro;
 
 import java.io.File;
+import java.net.InetAddress;
 
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.RemoteExecutor;
 import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -32,35 +41,34 @@ import org.junit.Test;
 public class AvroExternalJarProgramITCase {
 
 	private static final String JAR_FILE = "target/maven-test-jar.jar";
-	
+
 	private static final String TEST_DATA_FILE = "/testdata.avro";
 
 	@Test
 	public void testExternalProgram() {
-		
+
 		ForkableFlinkMiniCluster testMiniCluster = null;
-		
+
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
 			testMiniCluster = new ForkableFlinkMiniCluster(config, false);
 			testMiniCluster.start();
-			
+
 			String jarFile = JAR_FILE;
 			String testData = getClass().getResource(TEST_DATA_FILE).toString();
-			
+
 			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
 
+
 			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
-						
-			Client c = new Client(
-					config,
-					program.getUserCodeClassLoader(),
-					-1);
-
-			c.setPrintStatusDuringExecution(false);
-			c.run(program, 4, true);
+
+			Client client = new Client(config);
+
+			client.setPrintStatusDuringExecution(false);
+			client.runBlocking(program, 4);
+
 		}
 		catch (Throwable t) {
 			System.err.println(t.getMessage());
@@ -71,7 +79,9 @@ public class AvroExternalJarProgramITCase {
 			if (testMiniCluster != null) {
 				try {
 					testMiniCluster.stop();
-				} catch (Throwable t) {}
+				} catch (Throwable t) {
+					// ignore
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index ae2c047..29439f6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -31,10 +30,12 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
 
 	private final String host;
@@ -117,17 +118,17 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
 
+		Client client;
 		try {
-			Client client = new Client(configuration, usercodeClassLoader, -1);
+			client = new Client(configuration);
 			client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
-			
-			JobSubmissionResult result = client.run(jobGraph, true);
-			if (result instanceof JobExecutionResult) {
-				return (JobExecutionResult) result;
-			} else {
-				LOG.warn("The Client didn't return a JobExecutionResult");
-				return new JobExecutionResult(result.getJobID(), -1, null);
-			}
+		}
+		catch (Exception e) {
+			throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
+		}
+
+		try {
+			return client.runBlocking(jobGraph, usercodeClassLoader);
 		}
 		catch (ProgramInvocationException e) {
 			throw e;
@@ -136,6 +137,9 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 			String term = e.getMessage() == null ? "." : (": " + e.getMessage());
 			throw new ProgramInvocationException("The program execution failed" + term, e);
 		}
+		finally {
+			client.shutdown();
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index c2335d6..2b2a426 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -23,10 +23,12 @@ import java.util.List;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,17 +36,25 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
 
-	protected List<File> jars;
-	protected Client client;
+	private final List<File> jars;
+	
+	private final Client client;
+
+	private final ClassLoader userCodeClassLoader;
+	
 	private final boolean wait;
 
 	protected StreamContextEnvironment(Client client, List<File> jars, int parallelism, boolean wait) {
 		this.client = client;
 		this.jars = jars;
 		this.wait = wait;
+		
+		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, getClass().getClassLoader());
+		
 		if (parallelism > 0) {
 			setParallelism(parallelism);
-		} else {
+		}
+		else {
 			// first check for old parallelism config key
 			setParallelism(GlobalConfiguration.getInteger(
 					ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
@@ -73,16 +83,18 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 		transformations.clear();
 
+		// attach all necessary jar files to the JobGraph
 		for (File file : jars) {
 			jobGraph.addJar(new Path(file.getAbsolutePath()));
 		}
 
-		JobSubmissionResult result = client.run(jobGraph, wait);
-		if(result instanceof JobExecutionResult) {
-			return (JobExecutionResult) result;
+		// execute the programs
+		if (wait) {
+			return client.runBlocking(jobGraph, userCodeClassLoader);
 		} else {
-			LOG.warn("The Client didn't return a JobExecutionResult");
-			return new JobExecutionResult(result.getJobID(), -1, null);
+			JobSubmissionResult result = client.runDetached(jobGraph, userCodeClassLoader);
+			LOG.warn("Job was executed in detached mode, the results will be available on completion.");
+			return JobExecutionResult.fromJobSubmissionResult(result);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ff785c4..c50f23e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
@@ -40,9 +41,9 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.FileStateHandle;
@@ -1282,11 +1283,11 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @param jobName
 	 * 		Desired name of the job
-	 * @return The result of the job execution, containing elapsed time and
-	 * accumulators.
+	 * @return The result of the job execution: Either JobSubmissionResult or JobExecutionResult;
+	 * The latter contains elapsed time and accumulators.
 	 * @throws Exception
 	 */
-	public abstract JobExecutionResult execute(String jobName) throws Exception;
+	public abstract JobSubmissionResult execute(String jobName) throws Exception;
 
 	/**
 	 * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 8c1408e..e5ea2c5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -19,9 +19,8 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
-import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -68,6 +67,6 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 			((PreviewPlanEnvironment) env).setPreview(streamGraph.getStreamingPlanAsJSON());
 		}
 
-		throw new Client.ProgramAbortException();
+		throw new OptimizerPlanEnvironment.ProgramAbortException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
index 6d1b1c7..4c091e5 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
@@ -68,4 +68,9 @@ public class LocalTezEnvironment extends ExecutionEnvironment {
 		};
 		initializeContextEnvironment(factory);
 	}
+
+	@Override
+	public void startNewSession() throws Exception {
+		throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
index b155527..a02b536 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
@@ -75,4 +75,9 @@ public class RemoteTezEnvironment extends ExecutionEnvironment {
 		compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new org.apache.flink.configuration.Configuration());
 		executor = new TezExecutor(compiler, this.getDegreeOfParallelism());
 	}
+
+	@Override
+	public void startNewSession() throws Exception {
+		throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
index a54724f..60449db 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.tez.client;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.configuration.Configuration;
@@ -115,6 +116,26 @@ public class TezExecutor extends PlanExecutor {
 	}
 
 	@Override
+	public void start() throws Exception {
+		throw new IllegalStateException("Session management is not supported in the TezExecutor.");
+	}
+
+	@Override
+	public void stop() throws Exception {
+		throw new IllegalStateException("Session management is not supported in the TezExecutor.");
+	}
+
+	@Override
+	public void endSession(JobID jobID) throws Exception {
+		throw new IllegalStateException("Session management is not supported in the TezExecutor.");
+	}
+
+	@Override
+	public boolean isRunning() {
+		return false;
+	}
+
+	@Override
 	public JobExecutionResult executePlan(Plan plan) throws Exception {
 		return executePlanWithConf(tezConf, plan);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index e8b7e86..566573e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -55,6 +55,10 @@ public class TestEnvironment extends ExecutionEnvironment {
 	}
 
 	@Override
+	public void startNewSession() throws Exception {
+	}
+
+	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		try {
 			OptimizedPlan op = compileProgram(jobName);

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
index 3991ac0..3e2657c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
@@ -55,7 +55,8 @@ public class LocalExecutorITCase {
 			executor.setTaskManagerNumSlots(parallelism);
 			executor.setPrintStatusDuringExecution(false);
 			executor.start();
-			Plan wcPlan = wc.getPlan(Integer.valueOf(parallelism).toString(), inFile.toURI().toString(),outFile.toURI().toString());
+			Plan wcPlan = wc.getPlan(Integer.valueOf(parallelism).toString(),
+					inFile.toURI().toString(), outFile.toURI().toString());
 			wcPlan.setExecutionConfig(new ExecutionConfig());
 			executor.executePlan(wcPlan);
 			executor.stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 68b099d..b1768f0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -102,7 +102,7 @@ public class RemoteEnvironmentITCase {
 		try {
 			env.execute();
 			Assert.fail("Program should not run successfully, cause of invalid akka settings.");
-		} catch (ProgramInvocationException ex) {
+		} catch (IOException ex) {
 			throw ex.getCause();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
index 082532e..24d9416 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.test.optimizer.jsonplan;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.program.Client.ProgramAbortException;
-import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.examples.java.clustering.KMeans;
@@ -66,7 +66,7 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
 		try {
 			// <points path> <centers path> <result path> <num iterations
 			KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
-		} catch(ProgramAbortException pae) {
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
index 121bc88..a862242 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
@@ -311,6 +311,10 @@ public class JsonJobGraphGenerationTest {
 		}
 
 		@Override
+		public void startNewSession() throws Exception {
+		}
+
+		@Override
 		public JobExecutionResult execute(String jobName) throws Exception {
 			Plan plan = createProgramPlan(jobName);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index b0de0e8..6aea26c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.scala.runtime.jobmanager
 
-import akka.actor.Status.Success
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index fb8e5a1..3337f1a 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -507,7 +507,7 @@ public abstract class YarnTestBase extends TestLogger {
 				"expected string did not show up", expectedStringSeen);
 
 		// check for 0 return code
-		Assert.assertTrue("Expecting return value == "+returnCode, runner.getReturnValue() == returnCode);
+		Assert.assertEquals("Expected return value", returnCode, runner.getReturnValue());
 		LOG.info("Test was successful");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 57a5010..bd1698a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -25,6 +25,7 @@ import static akka.pattern.Patterns.ask;
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.net.NetUtils;
@@ -231,6 +232,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 	 */
 	@Override
 	public void stopAfterJob(JobID jobID) {
+		Preconditions.checkNotNull("The job id must not be null", jobID);
 		Future<Object> messageReceived = ask(applicationClient, new Messages.StopAMAfterJob(jobID), akkaTimeout);
 		try {
 			Await.result(messageReceived, akkaDuration);


[4/4] flink git commit: [FLINK-2097][core] implement a job session management

Posted by mx...@apache.org.
[FLINK-2097][core] implement a job session management

Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking #640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes #858.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71bf2f57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71bf2f57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71bf2f57

Branch: refs/heads/master
Commit: 71bf2f570861daae53b24bfcf1d06aedb85311b9
Parents: 7984acc
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 4 17:34:44 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 22 19:55:46 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 182 +++---
 .../org/apache/flink/client/LocalExecutor.java  | 222 ++++---
 .../org/apache/flink/client/RemoteExecutor.java | 188 ++++--
 .../org/apache/flink/client/program/Client.java | 653 +++++++++----------
 .../client/program/ContextEnvironment.java      |  40 +-
 .../flink/client/program/JobWithJars.java       |   5 +-
 .../program/OptimizerPlanEnvironment.java       | 132 ++++
 .../flink/client/program/PackagedProgram.java   |  51 +-
 .../client/program/PreviewPlanEnvironment.java  |  80 +++
 .../flink/client/web/JobSubmissionServlet.java  |  30 +-
 .../flink/client/CliFrontendInfoTest.java       |  81 +--
 .../client/CliFrontendPackageProgramTest.java   |  10 +-
 .../apache/flink/client/CliFrontendRunTest.java |  30 +-
 .../RemoteExecutorHostnameResolutionTest.java   |   6 +-
 .../client/program/ClientConnectionTest.java    |   8 +-
 .../apache/flink/client/program/ClientTest.java | 142 ++--
 .../ExecutionPlanAfterExecutionTest.java        |  15 +-
 .../program/ExecutionPlanCreationTest.java      |   9 +-
 .../client/program/PackagedProgramTest.java     |   1 -
 .../stormcompatibility/api/FlinkClient.java     |  22 +-
 .../flink/api/common/JobExecutionResult.java    |  17 +-
 .../java/org/apache/flink/api/common/JobID.java |  46 +-
 .../flink/api/common/JobSubmissionResult.java   |   5 +-
 .../java/org/apache/flink/api/common/Plan.java  |  71 +-
 .../apache/flink/api/common/PlanExecutor.java   |  85 ++-
 .../flink/api/java/CollectionEnvironment.java   |   4 +
 .../flink/api/java/ExecutionEnvironment.java    |  97 ++-
 .../apache/flink/api/java/LocalEnvironment.java | 180 ++++-
 .../flink/api/java/RemoteEnvironment.java       | 153 ++++-
 .../flink/optimizer/plan/OptimizedPlan.java     |  15 +-
 .../plantranslate/JobGraphGenerator.java        |  24 +-
 .../optimizer/postpass/JavaApiPostPass.java     |   2 +-
 .../apache/flink/runtime/client/JobClient.java  |  11 +-
 .../runtime/client/JobExecutionException.java   |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +
 .../apache/flink/runtime/jobgraph/JobGraph.java |  58 +-
 .../runtime/taskmanager/TaskExecutionState.java |   2 +-
 .../flink/runtime/jobmanager/JobInfo.scala      |  18 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  98 ++-
 .../runtime/jobmanager/MemoryArchivist.scala    |  11 +-
 .../runtime/messages/JobManagerMessages.scala   |   6 +
 .../runtime/minicluster/FlinkMiniCluster.scala  |   9 +-
 .../PartialConsumePipelinedResultTest.java      |   3 +-
 .../TaskManagerProcessReapingTest.java          |   2 +-
 .../runtime/jobmanager/JobManagerITCase.scala   | 126 +++-
 .../flink/api/scala/ExecutionEnvironment.scala  |  31 +-
 .../api/avro/AvroExternalJarProgramITCase.java  |  38 +-
 .../environment/RemoteStreamEnvironment.java    |  24 +-
 .../environment/StreamContextEnvironment.java   |  28 +-
 .../environment/StreamExecutionEnvironment.java |  11 +-
 .../api/environment/StreamPlanEnvironment.java  |   7 +-
 .../flink/tez/client/LocalTezEnvironment.java   |   5 +
 .../flink/tez/client/RemoteTezEnvironment.java  |   5 +
 .../apache/flink/tez/client/TezExecutor.java    |  21 +
 .../apache/flink/test/util/TestEnvironment.java |   4 +
 .../clients/examples/LocalExecutorITCase.java   |   3 +-
 .../RemoteEnvironmentITCase.java                |   2 +-
 .../jsonplan/DumpCompiledPlanTest.java          |   6 +-
 .../jsonplan/JsonJobGraphGenerationTest.java    |   4 +
 .../jobmanager/JobManagerFailsITCase.scala      |   1 -
 .../org/apache/flink/yarn/YarnTestBase.java     |   2 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java |   2 +
 62 files changed, 2114 insertions(+), 1040 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index fc4d98a..f0e6c4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -57,10 +57,13 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.FlinkPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -297,44 +300,51 @@ public class CliFrontend {
 			int userParallelism = options.getParallelism();
 			LOG.debug("User parallelism is set to {}", userParallelism);
 
-			Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
+			Client client = getClient(options, program.getMainClassName(), userParallelism);
 			client.setPrintStatusDuringExecution(options.getStdoutLogging());
 			LOG.debug("Client slots is set to {}", client.getMaxSlots());
-			if(client.getMaxSlots() != -1 && userParallelism == -1) {
-				logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
-						"To use another parallelism, set it at the ./bin/flink client.");
-				userParallelism = client.getMaxSlots();
-			}
 
-			// check if detached per job yarn cluster is used to start flink
-			if(yarnCluster != null && yarnCluster.isDetached()) {
-				logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
-						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-						"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
-						"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
-				exitCode = executeProgram(program, client, userParallelism, false);
-			} else {
-				// regular (blocking) execution.
-				exitCode = executeProgram(program, client, userParallelism, true);
-			}
+			try {
+				if (client.getMaxSlots() != -1 && userParallelism == -1) {
+					logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
+							"To use another parallelism, set it at the ./bin/flink client.");
+					userParallelism = client.getMaxSlots();
+				}
 
-			// show YARN cluster status if its not a detached YARN cluster.
-			if (yarnCluster != null && !yarnCluster.isDetached()) {
-				List<String> msgs = yarnCluster.getNewMessages();
-				if (msgs != null && msgs.size() > 1) {
+				// check if detached per job yarn cluster is used to start flink
+				if (yarnCluster != null && yarnCluster.isDetached()) {
+					logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
+							"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
+							"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
+							"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
+					exitCode = executeProgramDetached(program, client, userParallelism);
+				}
+				else {
+					// regular (blocking) execution.
+					exitCode = executeProgramBlocking(program, client, userParallelism);
+				}
+
+				// show YARN cluster status if its not a detached YARN cluster.
+				if (yarnCluster != null && !yarnCluster.isDetached()) {
+					List<String> msgs = yarnCluster.getNewMessages();
+					if (msgs != null && msgs.size() > 1) {
 
-					logAndSysout("The following messages were created by the YARN cluster while running the Job:");
-					for (String msg : msgs) {
-						logAndSysout(msg);
+						logAndSysout("The following messages were created by the YARN cluster while running the Job:");
+						for (String msg : msgs) {
+							logAndSysout(msg);
+						}
+					}
+					if (yarnCluster.hasFailed()) {
+						logAndSysout("YARN cluster is in failed state!");
+						logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
 					}
 				}
-				if (yarnCluster.hasFailed()) {
-					logAndSysout("YARN cluster is in failed state!");
-					logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
-				}
-			}
 
-			return exitCode;
+				return exitCode;
+			}
+			finally {
+				client.shutdown();
+			}
 		}
 		catch (Throwable t) {
 			return handleError(t);
@@ -395,8 +405,10 @@ public class CliFrontend {
 			int parallelism = options.getParallelism();
 
 			LOG.info("Creating program plan dump");
-			Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), parallelism);
-			FlinkPlan flinkPlan = client.getOptimizedPlan(program, parallelism);
+
+			Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
+
+			FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism);
 
 			if (webFrontend) {
 				this.optimizedPlan = flinkPlan;
@@ -425,6 +437,8 @@ public class CliFrontend {
 				}
 			}
 			return 0;
+
+
 		}
 		catch (Throwable t) {
 			return handleError(t);
@@ -623,52 +637,65 @@ public class CliFrontend {
 	//  Interaction with programs and JobManager
 	// --------------------------------------------------------------------------------------------
 
-	protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
-		LOG.info("Starting execution of program");
-		JobSubmissionResult execResult;
+	protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+		JobSubmissionResult result;
 		try {
-			execResult = client.run(program, parallelism, wait);
-		}
-		catch (ProgramInvocationException e) {
+			result = client.runDetached(program, parallelism);
+		} catch (ProgramInvocationException e) {
 			return handleError(e);
-		}
-		finally {
+		} finally {
 			program.deleteExtractedLibraries();
 		}
 
-		if(wait) {
-			LOG.info("Program execution finished");
-		}
-
-		// we come here after the job has finished (or the job has been submitted)
-		if (execResult != null) {
+		if (result != null) {
 			// if the job has been submitted to a detached YARN cluster, there won't be any
 			// exec results, but the object will be set (for the job id)
 			if (yarnCluster != null && yarnCluster.isDetached()) {
-				if(execResult.getJobID() == null) {
-					throw new RuntimeException("Error while starting job. No Job ID set.");
-				}
-				yarnCluster.stopAfterJob(execResult.getJobID());
+
+				yarnCluster.stopAfterJob(result.getJobID());
 				yarnCluster.disconnect();
-				if(!webFrontend) {
-					System.out.println("The Job has been submitted with JobID "+execResult.getJobID());
+				if (!webFrontend) {
+					System.out.println("The Job has been submitted with JobID " + result.getJobID());
 				}
 				return 0;
-			}
-			if (execResult instanceof JobExecutionResult) {
-				JobExecutionResult result = (JobExecutionResult) execResult;
-				if(!webFrontend) {
-					System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
-				}
-				Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
-				if (accumulatorsResult.size() > 0 && !webFrontend) {
-					System.out.println("Accumulator Results: ");
-					System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
-				}
 			} else {
-				LOG.info("The Job did not return an execution result");
+				throw new RuntimeException("Error while starting job. No Job ID set.");
+			}
+		}
+
+		return 0;
+	}
+
+	protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
+		LOG.info("Starting execution of program");
+
+		JobExecutionResult result;
+		try {
+			client.setPrintStatusDuringExecution(true);
+			result = client.runBlocking(program, parallelism);
+		}
+		catch (ProgramInvocationException e) {
+			return handleError(e);
+		}
+		finally {
+			program.deleteExtractedLibraries();
+		}
+
+		LOG.info("Program execution finished");
+
+		if (result != null) {
+			if (!webFrontend) {
+				System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
+			}
+			Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
+			if (accumulatorsResult.size() > 0 && !webFrontend) {
+				System.out.println("Accumulator Results: ");
+				System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
 			}
+		} else {
+			LOG.info("The Job did not return an execution result");
 		}
+
 		return 0;
 	}
 
@@ -767,7 +794,6 @@ public class CliFrontend {
 	 * Retrieves a {@link Client} object from the given command line options and other parameters.
 	 *
 	 * @param options Command line options which contain JobManager address
-	 * @param classLoader Class loader to use by the Client
 	 * @param programName Program name
 	 * @param userParallelism Given user parallelism
 	 * @return
@@ -775,12 +801,10 @@ public class CliFrontend {
 	 */
 	protected Client getClient(
 			CommandLineOptions options,
-			ClassLoader classLoader,
 			String programName,
 			int userParallelism)
 		throws Exception {
-		InetSocketAddress jobManagerAddress = null;
-
+		InetSocketAddress jobManagerAddress;
 		int maxSlots = -1;
 
 		if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
@@ -796,14 +820,16 @@ public class CliFrontend {
 
 			// the number of slots available from YARN:
 			int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
-			if(yarnTmSlots == -1) {
+			if (yarnTmSlots == -1) {
 				yarnTmSlots = 1;
 			}
 			maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
-			if(userParallelism != -1) {
+			if (userParallelism != -1) {
 				int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
-				logAndSysout("The YARN cluster has "+maxSlots+" slots available, but the user requested a parallelism of "+userParallelism+" on YARN. " +
-						"Each of the "+flinkYarnClient.getTaskManagerCount()+" TaskManagers will get "+slotsPerTM+" slots.");
+				logAndSysout("The YARN cluster has " + maxSlots + " slots available, " +
+						"but the user requested a parallelism of " + userParallelism + " on YARN. " +
+						"Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
+						"will get "+slotsPerTM+" slots.");
 				flinkYarnClient.setTaskManagerSlots(slotsPerTM);
 			}
 
@@ -811,11 +837,12 @@ public class CliFrontend {
 				yarnCluster = flinkYarnClient.deploy();
 				yarnCluster.connectToCluster();
 			}
-			catch(Exception e) {
+			catch (Exception e) {
 				throw new RuntimeException("Error deploying the YARN cluster", e);
 			}
 
 			jobManagerAddress = yarnCluster.getJobManagerAddress();
+			writeJobManagerAddressToConfig(jobManagerAddress);
 
 			logAndSysout("YARN cluster started");
 			logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
@@ -847,14 +874,11 @@ public class CliFrontend {
 		else {
 			if(options.getJobManagerAddress() != null) {
 				jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
+				writeJobManagerAddressToConfig(jobManagerAddress);
 			}
 		}
 
-		if(jobManagerAddress != null) {
-			writeJobManagerAddressToConfig(jobManagerAddress);
-		}
-
-		return new Client(config, classLoader, maxSlots);
+		return new Client(config, maxSlots);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index cf08e0a..7928e53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -22,11 +22,14 @@ import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.DataStatistics;
@@ -35,52 +38,66 @@ import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 /**
- * A class for executing a {@link Plan} on a local embedded Flink runtime instance.
+ * A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance.
+ *
+ * <p>By simply calling the {@link #executePlan(org.apache.flink.api.common.Plan)} method,
+ * this executor still start up and shut down again immediately after the program finished.</p>
+ *
+ * <p>To use this executor to execute many dataflow programs that constitute one job together,
+ * then this executor needs to be explicitly started, to keep running across several executions.</p>
  */
 public class LocalExecutor extends PlanExecutor {
 	
-	private static boolean DEFAULT_OVERWRITE = false;
+	private static final boolean DEFAULT_OVERWRITE = false;
 
 	private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
 
-	private final Object lock = new Object();	// we lock to ensure singleton execution
-	
+	/** we lock to ensure singleton execution */
+	private final Object lock = new Object();
+
+	/** The mini cluster on which to execute the local programs */
 	private LocalFlinkMiniCluster flink;
 
+	/** Custom user configuration for the execution */
 	private Configuration configuration;
 
-	// ---------------------------------- config options ------------------------------------------
-	
+	/** Config value for how many slots to provide in the local cluster */
 	private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 
+	/** Config flag whether to overwrite existing files by default */
 	private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
-	
-	// --------------------------------------------------------------------------------------------
-	
+
+	// ------------------------------------------------------------------------
+
 	public LocalExecutor() {
-		if (!ExecutionEnvironment.localExecutionIsAllowed()) {
-			throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting a program through a client.");
-		}
+		this(null);
 	}
 
 	public LocalExecutor(Configuration conf) {
-		this();
-		this.configuration = conf;
+		if (!ExecutionEnvironment.localExecutionIsAllowed()) {
+			throw new InvalidProgramException(
+					"The LocalEnvironment cannot be used when submitting a program through a client.");
+		}
+
+		this.configuration = conf != null ? conf : new Configuration();
 	}
 
+	// ------------------------------------------------------------------------
+	//  Configuration
+	// ------------------------------------------------------------------------
 
-	
 	public boolean isDefaultOverwriteFiles() {
 		return defaultOverwriteFiles;
 	}
-	
+
 	public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) {
 		this.defaultOverwriteFiles = defaultOverwriteFiles;
 	}
-	
+
 	public void setTaskManagerNumSlots(int taskManagerNumSlots) {
 		this.taskManagerNumSlots = taskManagerNumSlots; 
 	}
@@ -88,51 +105,48 @@ public class LocalExecutor extends PlanExecutor {
 	public int getTaskManagerNumSlots() {
 		return this.taskManagerNumSlots;
 	}
-	
-	// --------------------------------------------------------------------------------------------
 
-	public static Configuration createConfiguration(LocalExecutor le) {
-		Configuration configuration = new Configuration();
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, le.getTaskManagerNumSlots());
-		configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, le.isDefaultOverwriteFiles());
-		return configuration;
-	}
+	// --------------------------------------------------------------------------------------------
 
+	@Override
 	public void start() throws Exception {
-		synchronized (this.lock) {
-			if (this.flink == null) {
-				
+		synchronized (lock) {
+			if (flink == null) {
 				// create the embedded runtime
-				Configuration configuration = createConfiguration(this);
-				if(this.configuration != null) {
+				Configuration configuration = createConfiguration();
+				if (this.configuration != null) {
 					configuration.addAll(this.configuration);
 				}
 				// start it up
-				this.flink = new LocalFlinkMiniCluster(configuration, true);
+				flink = new LocalFlinkMiniCluster(configuration, true);
 				this.flink.start();
 			} else {
 				throw new IllegalStateException("The local executor was already started.");
 			}
 		}
 	}
-
-	/**
-	 * Stop the local executor instance. You should not call executePlan after this.
-	 */
+	
+	@Override
 	public void stop() throws Exception {
-		synchronized (this.lock) {
-			if (this.flink != null) {
-				this.flink.stop();
-				this.flink = null;
-			} else {
-				throw new IllegalStateException("The local executor was not started.");
+		synchronized (lock) {
+			if (flink != null) {
+				flink.stop();
+				flink = null;
 			}
 		}
 	}
 
+	@Override
+	public boolean isRunning() {
+		return flink != null;
+	}
+
 	/**
-	 * Execute the given plan on the local Nephele instance, wait for the job to
-	 * finish and return the runtime in milliseconds.
+	 * Executes the given program on a local runtime and waits for the job to finish.
+	 * 
+	 * <p>If the executor has not been started before, this starts the executor and shuts it down
+	 * after the job finished. If the job runs in session mode, the executor is kept alive until
+	 * no more references to the executor exist.</p>
 	 * 
 	 * @param plan The plan of the program to execute.
 	 * @return The net runtime of the program, in milliseconds.
@@ -145,15 +159,15 @@ public class LocalExecutor extends PlanExecutor {
 		if (plan == null) {
 			throw new IllegalArgumentException("The plan may not be null.");
 		}
-		
+
 		synchronized (this.lock) {
-			
+
 			// check if we start a session dedicated for this execution
 			final boolean shutDownAtEnd;
-			if (this.flink == null) {
-				// we start a session just for us now
+
+			if (flink == null) {
 				shutDownAtEnd = true;
-				
+
 				// configure the number of local slots equal to the parallelism of the local plan
 				if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
 					int maxParallelism = plan.getMaximumParallelism();
@@ -161,9 +175,11 @@ public class LocalExecutor extends PlanExecutor {
 						this.taskManagerNumSlots = maxParallelism;
 					}
 				}
-				
+
+				// start the cluster for us
 				start();
-			} else {
+			}
+			else {
 				// we use the existing session
 				shutDownAtEnd = false;
 			}
@@ -173,10 +189,10 @@ public class LocalExecutor extends PlanExecutor {
 
 				Optimizer pc = new Optimizer(new DataStatistics(), configuration);
 				OptimizedPlan op = pc.compile(plan);
-				
+
 				JobGraphGenerator jgg = new JobGraphGenerator(configuration);
-				JobGraph jobGraph = jgg.compileJobGraph(op);
-				
+				JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());
+
 				boolean sysoutPrint = isPrintingStatusDuringExecution();
 				return flink.submitJobAndWait(jobGraph, sysoutPrint);
 			}
@@ -189,32 +205,50 @@ public class LocalExecutor extends PlanExecutor {
 	}
 
 	/**
-	 * Returns a JSON dump of the optimized plan.
-	 * 
-	 * @param plan
-	 *            The program's plan.
-	 * @return JSON dump of the optimized plan.
-	 * @throws Exception
+	 * Creates a JSON representation of the given dataflow's execution plan.
+	 *
+	 * @param plan The dataflow plan.
+	 * @return The dataflow's execution plan, as a JSON string.
+	 * @throws Exception Thrown, if the optimization process that creates the execution plan failed.
 	 */
 	@Override
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-		Optimizer pc = new Optimizer(new DataStatistics(), createConfiguration(this));
+		final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
+
+		Optimizer pc = new Optimizer(new DataStatistics(), this.configuration);
+		pc.setDefaultParallelism(parallelism);
 		OptimizedPlan op = pc.compile(plan);
-		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
-	
-		return gen.getOptimizerPlanAsJSON(op);
+
+		return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
 	}
-	
+
+	@Override
+	public void endSession(JobID jobID) throws Exception {
+		LocalFlinkMiniCluster flink = this.flink;
+		if (flink != null) {
+			ActorGateway leaderGateway = flink.getLeaderGateway(AkkaUtils.getDefaultTimeout());
+			leaderGateway.tell(new JobManagerMessages.RemoveCachedJob(jobID));
+		}
+	}
+
+	private Configuration createConfiguration() {
+		Configuration configuration = new Configuration();
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
+		configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
+		return configuration;
+	}
+
+
 	// --------------------------------------------------------------------------------------------
 	//  Static variants that internally bring up an instance and shut it down after the execution
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 * Executes the program described by the given plan assembler.
+	 * Executes the given program.
 	 * 
-	 * @param pa The program's plan assembler. 
+	 * @param pa The program.
 	 * @param args The parameters.
-	 * @return The net runtime of the program, in milliseconds.
+	 * @return The execution result of the program.
 	 * 
 	 * @throws Exception Thrown, if either the startup of the local execution context, or the execution
 	 *                   caused an exception.
@@ -222,57 +256,45 @@ public class LocalExecutor extends PlanExecutor {
 	public static JobExecutionResult execute(Program pa, String... args) throws Exception {
 		return execute(pa.getPlan(args));
 	}
-	
+
 	/**
-	 * Executes the program represented by the given Pact plan.
+	 * Executes the given dataflow plan.
 	 * 
-	 * @param plan The program's plan. 
-	 * @return The net runtime of the program, in milliseconds.
+	 * @param plan The dataflow plan. 
+	 * @return The execution result.
 	 * 
 	 * @throws Exception Thrown, if either the startup of the local execution context, or the execution
 	 *                   caused an exception.
 	 */
 	public static JobExecutionResult execute(Plan plan) throws Exception {
-		LocalExecutor exec = new LocalExecutor();
-		try {
-			exec.start();
-			return exec.executePlan(plan);
-		} finally {
-			exec.stop();
-		}
+		return new LocalExecutor().executePlan(plan);
 	}
 
 	/**
-	 * Returns a JSON dump of the optimized plan.
+	 * Creates a JSON representation of the given dataflow's execution plan.
 	 * 
-	 * @param plan
-	 *            The program's plan.
-	 * @return JSON dump of the optimized plan.
-	 * @throws Exception
+	 * @param plan The dataflow plan.
+	 * @return The dataflow's execution plan, as a JSON string.
+	 * @throws Exception Thrown, if the optimization process that creates the execution plan failed.
 	 */
 	public static String optimizerPlanAsJSON(Plan plan) throws Exception {
-		LocalExecutor exec = new LocalExecutor();
-		try {
-			exec.start();
-			Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.configuration());
-			OptimizedPlan op = pc.compile(plan);
-			PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
-
-			return gen.getOptimizerPlanAsJSON(op);
-		} finally {
-			exec.stop();
-		}
+		final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
+
+		Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
+		pc.setDefaultParallelism(parallelism);
+		OptimizedPlan op = pc.compile(plan);
+
+		return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
 	}
 
 	/**
-	 * Return unoptimized plan as JSON.
+	 * Creates a JSON representation of the given dataflow plan.
 	 * 
-	 * @param plan The program plan.
-	 * @return The plan as a JSON object.
+	 * @param plan The dataflow plan.
+	 * @return The dataflow plan (prior to optimization) as a JSON string.
 	 */
 	public static String getPlanAsJSON(Plan plan) {
-		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 		List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(plan);
-		return gen.getPactPlanAsJSON(sinks);
+		return new PlanJSONDumpGenerator().getPactPlanAsJSON(sinks);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 20169f6..e8e9ade 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client;
 
-import java.io.File;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -26,36 +25,41 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program
  * and ships it to a remote Flink cluster for execution.
  * 
- * The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
- * set of libraries that need to be shipped together with the program.
+ * <p>The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
+ * set of libraries that need to be shipped together with the program.</p>
  * 
- * The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
- * remotely execute program parts.
+ * <p>The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
+ * remotely execute program parts.</p>
  */
 public class RemoteExecutor extends PlanExecutor {
+		
+	private final Object lock = new Object();
 	
-	private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);
-
 	private final List<String> jarFiles;
 
 	private final Configuration clientConfiguration;
+
+	private Client client;
+	
+	private int defaultParallelism = 1;
+	
 	
 	public RemoteExecutor(String hostname, int port) {
 		this(hostname, port, Collections.<String>emptyList(), new Configuration());
@@ -97,51 +101,148 @@ public class RemoteExecutor extends PlanExecutor {
 		clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
 	}
 
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Sets the parallelism that will be used when neither the program does not define
+	 * any parallelism at all.
+	 *
+	 * @param defaultParallelism The default parallelism for the executor.
+	 */
+	public void setDefaultParallelism(int defaultParallelism) {
+		if (defaultParallelism < 1) {
+			throw new IllegalArgumentException("The default parallelism must be at least one");
+		}
+		this.defaultParallelism = defaultParallelism;
+	}
+
+	/**
+	 * Gets the parallelism that will be used when neither the program does not define
+	 * any parallelism at all.
+	 * 
+	 * @return The default parallelism for the executor.
+	 */
+	public int getDefaultParallelism() {
+		return defaultParallelism;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Startup & Shutdown
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	public void start() throws Exception {
+		synchronized (lock) {
+			if (client == null) {
+				client = new Client(clientConfiguration);
+				client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
+			}
+			else {
+				throw new IllegalStateException("The remote executor was already started.");
+			}
+		}
+	}
+
+	@Override
+	public void stop() throws Exception {
+		synchronized (lock) {
+			if (client != null) {
+				client.shutdown();
+				client = null;
+			}
+		}
+	}
+
+	@Override
+	public boolean isRunning() {
+		return client != null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Executing programs
+	// ------------------------------------------------------------------------
+
 	@Override
 	public JobExecutionResult executePlan(Plan plan) throws Exception {
+		if (plan == null) {
+			throw new IllegalArgumentException("The plan may not be null.");
+		}
+
 		JobWithJars p = new JobWithJars(plan, this.jarFiles);
 		return executePlanWithJars(p);
 	}
-	
-	public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
-		Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
-		c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
-		
-		JobSubmissionResult result = c.run(p, -1, true);
-		if (result instanceof JobExecutionResult) {
-			return (JobExecutionResult) result;
-		} else {
-			LOG.warn("The Client didn't return a JobExecutionResult");
-			return new JobExecutionResult(result.getJobID(), -1, null);
+
+	public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
+		if (program == null) {
+			throw new IllegalArgumentException("The job may not be null.");
 		}
-	}
 
-	public JobExecutionResult executeJar(String jarPath, String assemblerClass, String... args) throws Exception {
-		File jarFile = new File(jarPath);
-		PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
-		
-		Client c = new Client(clientConfiguration, program.getUserCodeClassLoader(), -1);
-		c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
-		
-		JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
-		if(result instanceof JobExecutionResult) {
-			return (JobExecutionResult) result;
-		} else {
-			LOG.warn("The Client didn't return a JobExecutionResult");
-			return new JobExecutionResult(result.getJobID(), -1, null);
+		synchronized (this.lock) {
+			// check if we start a session dedicated for this execution
+			final boolean shutDownAtEnd;
+
+			if (client == null) {
+				shutDownAtEnd = true;
+				// start the executor for us
+				start();
+			}
+			else {
+				// we use the existing session
+				shutDownAtEnd = false;
+			}
+
+			try {
+				return client.runBlocking(program, defaultParallelism);
+			}
+			finally {
+				if (shutDownAtEnd) {
+					stop();
+				}
+			}
 		}
 	}
 
 	@Override
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-		JobWithJars p = new JobWithJars(plan, this.jarFiles);
-		Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
-		
-		OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
-		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-		return jsonGen.getOptimizerPlanAsJSON(op);
+		Optimizer opt = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration());
+		OptimizedPlan optPlan = opt.compile(plan);
+		return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optPlan);
 	}
-	
+
+	@Override
+	public void endSession(JobID jobID) throws Exception {
+		if (jobID == null) {
+			throw new NullPointerException("The supplied jobID must not be null.");
+		}
+
+		synchronized (this.lock) {
+			// check if we start a session dedicated for this execution
+			final boolean shutDownAtEnd;
+
+			if (client == null) {
+				shutDownAtEnd = true;
+				// start the executor for us
+				start();
+			}
+			else {
+				// we use the existing session
+				shutDownAtEnd = false;
+			}
+
+			try {
+				client.endSession(jobID);
+			}
+			finally {
+				if (shutDownAtEnd) {
+					stop();
+				}
+			}
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//   Utilities
 	// --------------------------------------------------------------------------------------------
@@ -168,5 +269,4 @@ public class RemoteExecutor extends PlanExecutor {
 		}
 		return new InetSocketAddress(host, port);
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index e7464c8..6c886fe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -18,10 +18,9 @@
 
 package org.apache.flink.client.program;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.PrintStream;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -32,8 +31,6 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -65,7 +62,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 import akka.actor.ActorSystem;
 
-import com.google.common.base.Preconditions;
 
 /**
  * Encapsulates the functionality necessary to submit a program to a remote cluster.
@@ -78,62 +74,139 @@ public class Client {
 	 * The configuration to use for the client (optimizer, timeouts, ...) and to connect to the
 	 * JobManager.
 	 */
-	private final Configuration configuration;
-
 	/** The optimizer used in the optimization of batch programs */
-	private final Optimizer compiler;
+	final Optimizer compiler;
+	
+	/** The actor system used to communicate with the JobManager */
+	private final ActorSystem actorSystem;
 
-	/** The class loader to use for classes from the user program (e.g., functions and data types) */
-	private final ClassLoader userCodeClassLoader;
+	/** The actor reference to the JobManager */
+	private final ActorGateway jobManagerGateway;
+
+	/** The timeout for communication between the client and the JobManager */
+	private final FiniteDuration timeout;
+	
+	/**
+	 * If != -1, this field specifies the total number of available slots on the cluster
+	 * connected to the client.
+	 */
+	private final int maxSlots;
 
 	/** Flag indicating whether to sysout print execution updates */
 	private boolean printStatusDuringExecution = true;
 
 	/**
-	 * If != -1, this field specifies the total number of available slots on the cluster
-	 * connected to the client.
+	 *  For interactive invocations, the Job ID is only available after the ContextEnvironment has
+	 *  been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
+	 *  which lets us access the last JobID here.
 	 */
-	private int maxSlots;
+	private JobID lastJobID;
 
-	/** ID of the last job submitted with this client. */
-	private JobID lastJobId = null;
-	
-	
 	// ------------------------------------------------------------------------
 	//                            Construction
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a instance that submits the programs to the JobManager defined in the
-	 * configuration. It sets the maximum number of slots to unknown (= -1).
+	 * configuration. This method will try to resolve the JobManager hostname and throw an exception
+	 * if that is not possible.
 	 *
-	 * @param config The config used to obtain the JobManager's address.
-	 * @param userCodeClassLoader The class loader to use for loading user code classes.
+	 * @param config The config used to obtain the job-manager's address, and used to configure the optimizer.
+	 * 
+	 * @throws java.io.IOException Thrown, if the client's actor system could not be started.
+	 * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.
 	 */
-	public Client(Configuration config, ClassLoader userCodeClassLoader) {
-		this(config, userCodeClassLoader, -1);
+	public Client(Configuration config) throws IOException {
+		this(config, -1);
 	}
 
 	/**
-	 * Creates a instance that submits the programs to the JobManager defined in the
-	 * configuration.
+	 * Creates a new instance of the class that submits the jobs to a job-manager.
+	 * at the given address using the default port.
 	 * 
-	 * @param config The config used to obtain the JobManager's address.
-	 * @param userCodeClassLoader The class loader to use for loading user code classes.
-	 * @param maxSlots The number of maxSlots on the cluster if != -1
+	 * @param config The configuration for the client-side processes, like the optimizer.
+	 * @param maxSlots maxSlots The number of maxSlots on the cluster if != -1.
+	 *                    
+	 * @throws java.io.IOException Thrown, if the client's actor system could not be started.
+	 * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.   
 	 */
-	public Client(Configuration config, ClassLoader userCodeClassLoader, int maxSlots) {
-		Preconditions.checkNotNull(config, "Configuration is null");
-		Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
-		
-		this.configuration = config;
-		this.userCodeClassLoader = userCodeClassLoader;
+	public Client(Configuration config, int maxSlots) throws IOException {
 
-		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
+		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
 		this.maxSlots = maxSlots;
+
+		LOG.info("Starting client actor system");
+
+		try {
+			this.actorSystem = JobClient.startJobClientActorSystem(config);
+		} catch (Exception e) {
+			throw new IOException("Could start client actor system.", e);
+		}
+
+		// from here on, we need to make sure the actor system is shut down on error
+		boolean success = false;
+
+		try {
+
+			FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(config);
+			this.timeout = AkkaUtils.getTimeout(config);
+
+			LOG.info("Looking up JobManager");
+			LeaderRetrievalService leaderRetrievalService;
+
+			try {
+				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+			} catch (Exception e) {
+				throw new IOException("Could not create the leader retrieval service.", e);
+			}
+
+			try {
+				this.jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
+						leaderRetrievalService,
+						actorSystem,
+						lookupTimeout);
+			} catch (LeaderRetrievalException e) {
+				throw new IOException("Failed to retrieve JobManager gateway", e);
+			}
+
+			LOG.info("Leading JobManager actor system address is " + this.jobManagerGateway.path());
+
+			LOG.info("JobManager runs at " + this.jobManagerGateway.path());
+
+			LOG.info("Communication between client and JobManager will have a timeout of " + this.timeout);
+			success = true;
+		} finally {
+			if (!success) {
+				try {
+					this.actorSystem.shutdown();
+
+					// wait at most for 30 seconds, to work around an occasional akka problem
+					actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS));
+				} catch (Throwable t) {
+					LOG.error("Shutting down actor system after error caused another error", t);
+				}
+			}
+		}
 	}
+	// ------------------------------------------------------------------------
+	//  Startup & Shutdown
+	// ------------------------------------------------------------------------
 
 	/**
+	 * Shuts down the client. This stops the internal actor system and actors.
+	 */
+	public void shutdown() {
+		if (!this.actorSystem.isTerminated()) {
+			this.actorSystem.shutdown();
+			this.actorSystem.awaitTermination();
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Configuration
+	// ------------------------------------------------------------------------
+	
+	/**
 	 * Configures whether the client should print progress updates during the execution to {@code System.out}.
 	 * All updates are logged via the SLF4J loggers regardless of this setting.
 	 * 
@@ -159,118 +232,84 @@ public class Client {
 	}
 	
 	// ------------------------------------------------------------------------
-	//                      Compilation and Submission
+	//  Access to the Program's Plan
 	// ------------------------------------------------------------------------
 	
-	public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
+	public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism)
+			throws CompilerException, ProgramInvocationException
+	{
 		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-		return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(prog, parallelism));
+		return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism));
 	}
-	
-	public FlinkPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
+
+	public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism)
+			throws CompilerException, ProgramInvocationException
+	{
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
-			return getOptimizedPlan(prog.getPlanWithJars(), parallelism);
-		}
-		else if (prog.isUsingInteractiveMode()) {
+			return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
+		} else if (prog.isUsingInteractiveMode()) {
 			// temporary hack to support the optimizer plan preview
-			OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(this.compiler);
+			OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler);
 			if (parallelism > 0) {
 				env.setParallelism(parallelism);
 			}
-			env.setAsContext();
-			
-			// temporarily write syserr and sysout to a byte array.
-			PrintStream originalOut = System.out;
-			PrintStream originalErr = System.err;
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			System.setOut(new PrintStream(baos));
-			ByteArrayOutputStream baes = new ByteArrayOutputStream();
-			System.setErr(new PrintStream(baes));
-			try {
-				ContextEnvironment.enableLocalExecution(false);
-				prog.invokeInteractiveModeForExecution();
-			}
-			catch (ProgramInvocationException e) {
-				throw e;
-			}
-			catch (Throwable t) {
-				// the invocation gets aborted with the preview plan
-				if (env.optimizerPlan != null) {
-					return env.optimizerPlan;
-				} else {
-					throw new ProgramInvocationException("The program caused an error: ", t);
-				}
-			}
-			finally {
-				ContextEnvironment.enableLocalExecution(true);
-				System.setOut(originalOut);
-				System.setErr(originalErr);
-				System.err.println(baes);
-				System.out.println(baos);
-			}
-			
-			throw new ProgramInvocationException(
-					"The program plan could not be fetched - the program aborted pre-maturely.\n"
-					+ "System.err: " + baes.toString() + '\n'
-					+ "System.out: " + baos.toString() + '\n');
-		}
-		else {
-			throw new RuntimeException();
+
+			return env.getOptimizedPlan(prog);
+		} else {
+			throw new RuntimeException("Couldn't determine program mode.");
 		}
 	}
-	
-	public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
+
+	public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
 		if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
-			LOG.debug("Changing plan default parallelism from {} to {}",p.getDefaultParallelism(), parallelism);
+			LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
 			p.setDefaultParallelism(parallelism);
 		}
 		LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
 
-		return this.compiler.compile(p);
-	}
-	
-	
-	/**
-	 * Creates the optimized plan for a given program, using this client's compiler.
-	 *  
-	 * @param prog The program to be compiled.
-	 * @return The compiled and optimized plan, as returned by the compiler.
-	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
-	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
-	 */
-	public FlinkPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
-		return getOptimizedPlan(prog.getPlan(), parallelism);
-	}
-	
-	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
-		return getJobGraph(optPlan, prog.getAllLibraries());
+		return compiler.compile(p);
 	}
-	
-	private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
-		JobGraph job;
-		if (optPlan instanceof StreamingPlan) {
-			job = ((StreamingPlan) optPlan).getJobGraph();
-		} else {
-			JobGraphGenerator gen = new JobGraphGenerator(this.configuration);
-			job = gen.compileJobGraph((OptimizedPlan) optPlan);
-		}
 
-		for (File jar : jarFiles) {
-			job.addJar(new Path(jar.getAbsolutePath()));
+	// ------------------------------------------------------------------------
+	//  Program submission / execution
+	// ------------------------------------------------------------------------
+
+	public JobExecutionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
+		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+		if (prog.isUsingProgramEntryPoint()) {
+			return runBlocking(prog.getPlanWithJars(), parallelism);
 		}
+		else if (prog.isUsingInteractiveMode()) {
+			LOG.info("Starting program in interactive mode");
+			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, true);
+			ContextEnvironment.enableLocalExecution(false);
 
-		return job;
+			// invoke here
+			try {
+				prog.invokeInteractiveModeForExecution();
+			}
+			finally {
+				ContextEnvironment.enableLocalExecution(true);
+			}
+
+			return JobExecutionResult.fromJobSubmissionResult(new JobSubmissionResult(lastJobID));
+		}
+		else {
+			throw new RuntimeException();
+		}
 	}
 
-	public JobSubmissionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
+	public JobSubmissionResult runDetached(PackagedProgram prog, int parallelism)
+			throws ProgramInvocationException
+	{
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
-			return run(prog.getPlanWithJars(), parallelism, wait);
+			return runDetached(prog.getPlanWithJars(), parallelism);
 		}
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
-			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, wait);
+			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, false);
 			ContextEnvironment.enableLocalExecution(false);
 
 			// invoke here
@@ -281,113 +320,108 @@ public class Client {
 				ContextEnvironment.enableLocalExecution(true);
 			}
 
-			// Job id has been set in the Client passed to the ContextEnvironment
-			return new JobSubmissionResult(lastJobId);
+			return new JobSubmissionResult(lastJobID);
 		}
 		else {
-			throw new RuntimeException();
+			throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");
 		}
 	}
-	
-	public JobSubmissionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
-		return run(optimizedPlan, prog.getAllLibraries(), wait);
 
-	}
-	
 	/**
-	 * Runs a program on Flink cluster whose job-manager is configured in this client's configuration.
-	 * This method involves all steps, from compiling, job-graph generation to submission.
-	 * 
-	 * @param prog The program to be executed.
+	 * Runs a program on the Flink cluster to which this client is connected. The call blocks until the
+	 * execution is complete, and returns afterwards.
+	 *
+	 * @param program The program to be executed.
 	 * @param parallelism The default parallelism to use when running the program. The default parallelism is used
 	 *                    when the program does not set a parallelism by itself.
-	 * @param wait A flag that indicates whether this function call should block until the program execution is done.
+	 *
 	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
 	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
 	 *                                    or if the submission failed. That might be either due to an I/O problem,
 	 *                                    i.e. the job-manager is unreachable, or due to the fact that the
 	 *                                    parallel execution failed.
 	 */
-	public JobSubmissionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
-		return run((OptimizedPlan) getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
+	public JobExecutionResult runBlocking(JobWithJars program, int parallelism) 
+			throws CompilerException, ProgramInvocationException
+	{
+		ClassLoader classLoader = program.getUserCodeClassLoader();
+		if (classLoader == null) {
+			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
+		}
+
+		OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
+		return runBlocking(optPlan, program.getJarFiles(), classLoader);
+	}
+
+	/**
+	 * Submits a program to the Flink cluster to which this client is connected. The call returns after the
+	 * program was submitted and does not wait for the program to complete.
+	 *
+	 * @param program The program to be executed.
+	 * @param parallelism The default parallelism to use when running the program. The default parallelism is used
+	 *                    when the program does not set a parallelism by itself.
+	 *
+	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
+	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
+	 *                                    or if the submission failed. That might be either due to an I/O problem,
+	 *                                    i.e. the job-manager is unreachable.
+	 */
+	public JobSubmissionResult runDetached(JobWithJars program, int parallelism)
+			throws CompilerException, ProgramInvocationException
+	{
+		ClassLoader classLoader = program.getUserCodeClassLoader();
+		if (classLoader == null) {
+			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
+		}
+
+		OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, program, parallelism);
+		return runDetached(optimizedPlan, program.getJarFiles(), classLoader);
 	}
 	
 
-	public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
+	public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
+			throws ProgramInvocationException
+	{
 		JobGraph job = getJobGraph(compiledPlan, libraries);
-		this.lastJobId = job.getJobID();
-		return run(job, wait);
+		return runBlocking(job, classLoader);
 	}
 
-	public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
-		this.lastJobId = jobGraph.getJobID();
-		
-		LOG.info("Starting client actor system");
-		final ActorSystem actorSystem;
+	public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
+			throws ProgramInvocationException
+	{
+		JobGraph job = getJobGraph(compiledPlan, libraries);
+		return runDetached(job, classLoader);
+	}
+
+	public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+		LOG.info("Checking and uploading JAR files");
 		try {
-			actorSystem = JobClient.startJobClientActorSystem(configuration);
+			JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
+		} catch (IOException e) {
+			throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
 		}
-		catch (Exception e) {
-			throw new ProgramInvocationException("Could start client actor system.", e);
+		try {
+			this.lastJobID = jobGraph.getJobID();
+			return JobClient.submitJobAndWait(actorSystem, jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, classLoader);
+		} catch (JobExecutionException e) {
+			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
 		}
+	}
 
+	public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+		LOG.info("Checking and uploading JAR files");
 		try {
-			FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-			FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
-
-			LOG.info("Looking up JobManager");
-			ActorGateway jobManagerGateway;
-
-			LeaderRetrievalService leaderRetrievalService;
-
-			try {
-				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-			} catch (Exception e) {
-				throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
-			}
-
-			try {
-				jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-						leaderRetrievalService,
-						actorSystem,
-						lookupTimeout);
-			} catch (LeaderRetrievalException e) {
-				throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
-			}
-
-			LOG.info("Leading JobManager actor system address is " + jobManagerGateway.path());
-
-			LOG.info("JobManager runs at " + jobManagerGateway.path());
-
-			LOG.info("Communication between client and JobManager will have a timeout of " + timeout);
-
-			LOG.info("Checking and uploading JAR files");
-			try {
-				JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
-			} catch (IOException e) {
-				throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
-			}
-
-			try {
-				if (wait) {
-					return JobClient.submitJobAndWait(actorSystem,
-						jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, userCodeClassLoader);
-				} else {
-					JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, userCodeClassLoader);
-					// return a dummy execution result with the JobId
-					return new JobSubmissionResult(jobGraph.getJobID());
-				}
-			} catch (JobExecutionException e) {
+			JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
+		}
+		catch (IOException e) {
+			throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
+		}
+		try {
+			this.lastJobID = jobGraph.getJobID();
+			JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, classLoader);
+			return new JobSubmissionResult(jobGraph.getJobID());
+		} catch (JobExecutionException e) {
 				throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
-			} catch (Exception e) {
-				throw new ProgramInvocationException("Exception during program execution.", e);
-			}
-		} finally {
-			// shut down started actor system
-			actorSystem.shutdown();
-			
-			// wait at most for 30 seconds, to work around an occasional akka problem
-			actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS));
 		}
 	}
 
@@ -397,62 +431,26 @@ public class Client {
 	 * @throws Exception In case an error occurred.
 	 */
 	public void cancel(JobID jobId) throws Exception {
-		final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
-		final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-
-		ActorSystem actorSystem;
+		Future<Object> response;
 		try {
-			actorSystem = JobClient.startJobClientActorSystem(configuration);
+			response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
 		} catch (Exception e) {
-			throw new ProgramInvocationException("Could start client actor system.", e);
+			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
 		}
 
-		try {
-			ActorGateway jobManagerGateway;
-
-			LeaderRetrievalService leaderRetrievalService;
+		Object result = Await.result(response, timeout);
 
-			try {
-				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-			} catch (Exception e) {
-				throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
-			}
-
-			try {
-				jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-						leaderRetrievalService,
-						actorSystem,
-						lookupTimeout);
-			} catch (LeaderRetrievalException e) {
-				throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
-			}
-
-			Future<Object> response;
-			try {
-				response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
-			} catch (Exception e) {
-				throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
-			}
-			
-			Object result = Await.result(response, timeout);
-
-			if (result instanceof JobManagerMessages.CancellationSuccess) {
-				LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
-			} else if (result instanceof JobManagerMessages.CancellationFailure) {
-				Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
-				LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
-				throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
-			} else {
-				throw new Exception("Unknown message received while cancelling.");
-			}
-		} finally {
-			// shut down started actor system
-			actorSystem.shutdown();
-			actorSystem.awaitTermination();
+		if (result instanceof JobManagerMessages.CancellationSuccess) {
+			LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
+		} else if (result instanceof JobManagerMessages.CancellationFailure) {
+			Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
+			LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
+			throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
+		} else {
+			throw new Exception("Unknown message received while cancelling.");
 		}
 	}
 
-
 	/**
 	 * Requests and returns the accumulators for the given job identifier. Accumulators can be
 	 * requested while a is running or after it has finished. The default class loader is used
@@ -473,117 +471,98 @@ public class Client {
 	 */
 	public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
 
-		final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
-		final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-
-		ActorSystem actorSystem;
+		Future<Object> response;
 		try {
-			actorSystem = JobClient.startJobClientActorSystem(configuration);
+			response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
 		} catch (Exception e) {
-			throw new Exception("Could start client actor system.", e);
+			throw new Exception("Failed to query the job manager gateway for accumulators.", e);
 		}
 
-		try {
-			ActorGateway jobManagerGateway;
-
-			LeaderRetrievalService leaderRetrievalService;
-
-			try {
-				leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-			} catch (Exception e) {
-				throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
-			}
-
-			try {
-				jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-						leaderRetrievalService,
-						actorSystem,
-						lookupTimeout);
-			} catch (LeaderRetrievalException e) {
-				throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
-			}
-
-			Future<Object> response;
-			try {
-				response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
-			} catch (Exception e) {
-				throw new Exception("Failed to query the job manager gateway for accumulators.", e);
-			}
-
-			Object result = Await.result(response, timeout);
+		Object result = Await.result(response, timeout);
 
-			if (result instanceof AccumulatorResultsFound) {
-				Map<String, SerializedValue<Object>> serializedAccumulators =
-						((AccumulatorResultsFound) result).result();
+		if (result instanceof AccumulatorResultsFound) {
+			Map<String, SerializedValue<Object>> serializedAccumulators =
+					((AccumulatorResultsFound) result).result();
 
-				return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
+			return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
 
-			} else if (result instanceof AccumulatorResultsErroneous) {
-				throw ((AccumulatorResultsErroneous) result).cause();
-			} else {
-				throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
-			}
-		} finally {
-			actorSystem.shutdown();
-			actorSystem.awaitTermination();
+		} else if (result instanceof AccumulatorResultsErroneous) {
+			throw ((AccumulatorResultsErroneous) result).cause();
+		} else {
+			throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
 		}
 	}
 
 
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  Sessions
+	// ------------------------------------------------------------------------
 	
-	public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {
-		
-		private final Optimizer compiler;
-		
-		private FlinkPlan optimizerPlan;
-		
-		
-		private OptimizerPlanEnvironment(Optimizer compiler) {
-			this.compiler = compiler;
-		}
-		
-		@Override
-		public JobExecutionResult execute(String jobName) throws Exception {
-			Plan plan = createProgramPlan(jobName);
-			this.optimizerPlan = compiler.compile(plan);
-			
-			// do not go on with anything now!
-			throw new ProgramAbortException();
+	/**
+	 * Tells the JobManager to finish the session (job) defined by the given ID.
+	 * 
+	 * @param jobId The ID that identifies the session.
+	 */
+	public void endSession(JobID jobId) throws Exception {
+		if (jobId == null) {
+			throw new IllegalArgumentException("The JobID must not be null.");
 		}
+		endSessions(Collections.singletonList(jobId));
+	}
 
-		@Override
-		public String getExecutionPlan() throws Exception {
-			Plan plan = createProgramPlan(null, false);
-			this.optimizerPlan = compiler.compile(plan);
-			
-			// do not go on with anything now!
-			throw new ProgramAbortException();
-		}
-		
-		private void setAsContext() {
-			ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
-				
-				@Override
-				public ExecutionEnvironment createExecutionEnvironment() {
-					return OptimizerPlanEnvironment.this;
-				}
-			};
-			initializeContextEnvironment(factory);
+	/**
+	 * Tells the JobManager to finish the sessions (jobs) defined by the given IDs.
+	 *
+	 * @param jobIds The IDs that identify the sessions.
+	 */
+	public void endSessions(List<JobID> jobIds) throws Exception {
+		if (jobIds == null) {
+			throw new IllegalArgumentException("The JobIDs must not be null");
 		}
 		
-		public void setPlan(FlinkPlan plan){
-			this.optimizerPlan = plan;
+		for (JobID jid : jobIds) {
+			if (jid != null) {
+				LOG.info("Telling job manager to end the session {}.", jid);
+				jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid));
+			}
 		}
 	}
 
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  Internal translation methods
+	// ------------------------------------------------------------------------
 
 	/**
-	 * A special exception used to abort programs when the caller is only interested in the
-	 * program plan, rather than in the full execution.
+	 * Creates the optimized plan for a given program, using this client's compiler.
+	 *
+	 * @param prog The program to be compiled.
+	 * @return The compiled and optimized plan, as returned by the compiler.
+	 * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
+	 * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
 	 */
-	public static final class ProgramAbortException extends Error {
-		private static final long serialVersionUID = 1L;
+	private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) throws CompilerException,
+																					ProgramInvocationException {
+		return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
 	}
+
+	public static JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
+		return getJobGraph(optPlan, prog.getAllLibraries());
+	}
+
+	private static JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
+		JobGraph job;
+		if (optPlan instanceof StreamingPlan) {
+			job = ((StreamingPlan) optPlan).getJobGraph();
+		} else {
+			JobGraphGenerator gen = new JobGraphGenerator();
+			job = gen.compileJobGraph((OptimizedPlan) optPlan);
+		}
+
+		for (File jar : jarFiles) {
+			job.addJar(new Path(jar.getAbsolutePath()));
+		}
+
+		return job;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 9287017..ad14a06 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -39,15 +40,14 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	private static final Logger LOG = LoggerFactory.getLogger(ContextEnvironment.class);
 
 	private final Client client;
-	
+
 	private final List<File> jarFilesToAttach;
-	
+
 	private final ClassLoader userCodeClassLoader;
 
 	private final boolean wait;
-	
-	
-	
+
+
 	public ContextEnvironment(Client remoteConnection, List<File> jarFiles, ClassLoader userCodeClassLoader, boolean wait) {
 		this.client = remoteConnection;
 		this.jarFilesToAttach = jarFiles;
@@ -60,27 +60,33 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		Plan p = createProgramPlan(jobName);
 		JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.userCodeClassLoader);
 
-		JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait);
-		if(result instanceof JobExecutionResult) {
-			this.lastJobExecutionResult = (JobExecutionResult) result;
-			return (JobExecutionResult) result;
-		} else {
-			LOG.warn("The Client didn't return a JobExecutionResult");
-			this.lastJobExecutionResult = new JobExecutionResult(result.getJobID(), -1, null);
+		if (wait) {
+			this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
+			return this.lastJobExecutionResult;
+		}
+		else {
+			JobSubmissionResult result = client.runDetached(toRun, getParallelism());
+			LOG.warn("Job was executed in detached mode, the results will be available on completion.");
+			this.lastJobExecutionResult = JobExecutionResult.fromJobSubmissionResult(result);
 			return this.lastJobExecutionResult;
 		}
 	}
 
 	@Override
 	public String getExecutionPlan() throws Exception {
-		Plan p = createProgramPlan("unnamed job");
-		
-		OptimizedPlan op = (OptimizedPlan) this.client.getOptimizedPlan(p, getParallelism());
+		Plan plan = createProgramPlan("unnamed job");
 
+		OptimizedPlan op = Client.getOptimizedPlan(client.compiler, plan, getParallelism());
 		PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
 		return gen.getOptimizerPlanAsJSON(op);
 	}
 
+	@Override
+	public void startNewSession() throws Exception {
+		client.endSession(jobID);
+		jobID = JobID.generate();
+	}
+
 	public boolean isWait() {
 		return wait;
 	}
@@ -104,7 +110,9 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	static void setAsContext(Client client, List<File> jarFilesToAttach, 
 				ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
 	{
-		initializeContextEnvironment(new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait));
+		ContextEnvironmentFactory factory =
+				new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait);
+		initializeContextEnvironment(factory);
 	}
 	
 	protected static void enableLocalExecution(boolean enabled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index b86487f..9e84e2d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client.program;
 
 import java.io.File;
@@ -30,6 +29,10 @@ import java.util.List;
 
 import org.apache.flink.api.common.Plan;
 
+/**
+ * A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files that contain
+ * the classes of the functions and libraries necessary for the execution.
+ */
 public class JobWithJars {
 	
 	private Plan plan;

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
new file mode 100644
index 0000000..c9c3b45
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+public class OptimizerPlanEnvironment extends ExecutionEnvironment {
+
+	private final Optimizer compiler;
+
+	private FlinkPlan optimizerPlan;
+
+	public OptimizerPlanEnvironment(Optimizer compiler) {
+		this.compiler = compiler;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Execution Environment methods
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		Plan plan = createProgramPlan(jobName);
+		this.optimizerPlan = compiler.compile(plan);
+
+		// do not go on with anything now!
+		throw new ProgramAbortException();
+	}
+
+	@Override
+	public String getExecutionPlan() throws Exception {
+		Plan plan = createProgramPlan(null, false);
+		this.optimizerPlan = compiler.compile(plan);
+
+		// do not go on with anything now!
+		throw new ProgramAbortException();
+	}
+
+	@Override
+	public void startNewSession() {
+		// do nothing
+	}
+
+	public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {
+		setAsContext();
+
+		// temporarily write syserr and sysout to a byte array.
+		PrintStream originalOut = System.out;
+		PrintStream originalErr = System.err;
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		System.setOut(new PrintStream(baos));
+		ByteArrayOutputStream baes = new ByteArrayOutputStream();
+		System.setErr(new PrintStream(baes));
+		try {
+			ContextEnvironment.enableLocalExecution(false);
+			prog.invokeInteractiveModeForExecution();
+		}
+		catch (ProgramInvocationException e) {
+			throw e;
+		}
+		catch (Throwable t) {
+			// the invocation gets aborted with the preview plan
+			if (optimizerPlan != null) {
+				return optimizerPlan;
+			} else {
+				throw new ProgramInvocationException("The program caused an error: ", t);
+			}
+		}
+		finally {
+			ContextEnvironment.enableLocalExecution(true);
+			System.setOut(originalOut);
+			System.setErr(originalErr);
+			System.err.println(baes);
+			System.out.println(baos);
+		}
+
+		throw new ProgramInvocationException(
+				"The program plan could not be fetched - the program aborted pre-maturely.\n"
+						+ "System.err: " + baes.toString() + '\n'
+						+ "System.out: " + baos.toString() + '\n');
+	}
+	// ------------------------------------------------------------------------
+
+	private void setAsContext() {
+		ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+
+			@Override
+			public ExecutionEnvironment createExecutionEnvironment() {
+				return OptimizerPlanEnvironment.this;
+			}
+		};
+		initializeContextEnvironment(factory);
+	}
+
+	// ------------------------------------------------------------------------
+	
+	public void setPlan(FlinkPlan plan){
+		this.optimizerPlan = plan;
+	}
+
+	/**
+	 * A special exception used to abort programs when the caller is only interested in the
+	 * program plan, rather than in the full execution.
+	 */
+	public static final class ProgramAbortException extends Error {
+		private static final long serialVersionUID = 1L;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 10096da..091a959 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -40,12 +40,9 @@ import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
@@ -166,7 +163,7 @@ public class PackagedProgram {
 		}
 	}
 	
-	public PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException {
+	PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException {
 		this.jarFile = null;
 		this.args = args == null ? new String[0] : args;
 		
@@ -685,51 +682,5 @@ public class PackagedProgram {
 			throw new ProgramInvocationException("Cannot access jar file" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static final class PreviewPlanEnvironment extends ExecutionEnvironment {
-
-		private List<DataSinkNode> previewPlan;
-		private Plan plan;
-		
-		private String preview = null;
-		
-		@Override
-		public JobExecutionResult execute(String jobName) throws Exception {
-			this.plan = createProgramPlan(jobName);
-			this.previewPlan = Optimizer.createPreOptimizedPlan((Plan) plan);
-			
-			// do not go on with anything now!
-			throw new Client.ProgramAbortException();
-		}
 
-		@Override
-		public String getExecutionPlan() throws Exception {
-			Plan plan = createProgramPlan("unused");
-			this.previewPlan = Optimizer.createPreOptimizedPlan(plan);
-			
-			// do not go on with anything now!
-			throw new Client.ProgramAbortException();
-		}
-		
-		public void setAsContext() {
-			ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
-				@Override
-				public ExecutionEnvironment createExecutionEnvironment() {
-					return PreviewPlanEnvironment.this;
-				}
-			};
-			initializeContextEnvironment(factory);
-		}
-
-		public Plan getPlan() {
-			return this.plan;
-		}
-		
-		public void setPreview(String preview) {
-			this.preview = preview;
-		}
-
-	}
 }