You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/11/13 16:52:22 UTC

[1/2] flink git commit: [FLINK-2850] Limit the types of jobs which can run in detached mode

Repository: flink
Updated Branches:
  refs/heads/master 30647a2e6 -> 00e44eda1


[FLINK-2850] Limit the types of jobs which can run in detached mode

This disallows the following types of interactive programs in detached
mode:

1. More than one call to execute

2. Accessing job execution results like
accumulators, net runtime, etc. This effectively disables eager
execution functions such as count, print, collect, etc. too


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00e44eda
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00e44eda
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00e44eda

Branch: refs/heads/master
Commit: 00e44eda17052c4b3de4f9690c250a45df6f407f
Parents: b7cf642
Author: Sachin Goel <sa...@gmail.com>
Authored: Mon Nov 9 16:54:15 2015 +0530
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Nov 13 16:28:38 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  14 +-
 .../org/apache/flink/client/program/Client.java |  13 +-
 .../client/program/ContextEnvironment.java      |  95 ++------------
 .../program/ContextEnvironmentFactory.java      |  80 ++++++++++++
 .../client/program/DetachedEnvironment.java     | 116 +++++++++++++++++
 .../apache/flink/client/program/ClientTest.java | 127 +++++++++++++++++++
 .../environment/StreamContextEnvironment.java   |  55 ++------
 .../environment/StreamExecutionEnvironment.java |  12 +-
 8 files changed, 370 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index e93025f..ab3af85 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -40,6 +40,7 @@ import java.util.Properties;
 import akka.actor.ActorSystem;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -920,7 +921,18 @@ public class CliFrontend {
 
 		System.err.println("\n------------------------------------------------------------");
 		System.err.println(" The program finished with the following exception:\n");
-		t.printStackTrace();
+		if (t.getCause() instanceof InvalidProgramException) {
+			System.err.println(t.getCause().getMessage());
+			StackTraceElement[] trace = t.getCause().getStackTrace();
+			for (StackTraceElement ele: trace) {
+				System.err.println("\t" + ele.toString());
+				if (ele.getMethodName().equals("main")) {
+					break;
+				}
+			}
+		} else {
+			t.printStackTrace();
+		}
 		return 1;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 7d226d2..8f92c51 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -242,9 +242,8 @@ public class Client {
 		}
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
-			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(),
-				prog.getUserCodeClassLoader(), parallelism, true);
-
+			ContextEnvironment.setAsContext(new ContextEnvironmentFactory(this, prog.getAllLibraries(),
+					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, true));
 			// invoke here
 			try {
 				prog.invokeInteractiveModeForExecution();
@@ -269,18 +268,18 @@ public class Client {
 		}
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
-			ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(),
-				prog.getUserCodeClassLoader(), parallelism, false);
+			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(),
+					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, false);
+			ContextEnvironment.setAsContext(factory);
 
 			// invoke here
 			try {
 				prog.invokeInteractiveModeForExecution();
+				return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
 			}
 			finally {
 				ContextEnvironment.unsetContext();
 			}
-
-			return new JobSubmissionResult(lastJobID);
 		}
 		else {
 			throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");

http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index d5a28fc..1e3d0d4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -23,41 +23,30 @@ import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Execution Environment for remote execution with the Client.
+ * Execution Environment for remote execution with the Client in blocking fashion.
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
-	private static final Logger LOG = LoggerFactory.getLogger(ContextEnvironment.class);
+	protected final Client client;
 
-	private final Client client;
+	protected final List<URL> jarFilesToAttach;
 
-	private final List<URL> jarFilesToAttach;
-
-	private final List<URL> classpathsToAttach;
-	
-	private final ClassLoader userCodeClassLoader;
-
-	private final boolean wait;
-	
+	protected final List<URL> classpathsToAttach;
 	
+	protected final ClassLoader userCodeClassLoader;
 	
 	public ContextEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths,
-			ClassLoader userCodeClassLoader, boolean wait) {
+			ClassLoader userCodeClassLoader) {
 		this.client = remoteConnection;
 		this.jarFilesToAttach = jarFiles;
 		this.classpathsToAttach = classpaths;
 		this.userCodeClassLoader = userCodeClassLoader;
-		this.wait = wait;
 	}
 
 	@Override
@@ -65,17 +54,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		Plan p = createProgramPlan(jobName);
 		JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach,
 				this.userCodeClassLoader);
-
-		if (wait) {
-			this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
-			return this.lastJobExecutionResult;
-		}
-		else {
-			JobSubmissionResult result = client.runDetached(toRun, getParallelism());
-			LOG.warn("Job was executed in detached mode, the results will be available on completion.");
-			this.lastJobExecutionResult = JobExecutionResult.fromJobSubmissionResult(result);
-			return this.lastJobExecutionResult;
-		}
+		this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
+		return this.lastJobExecutionResult;
 	}
 
 	@Override
@@ -93,10 +73,6 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		jobID = JobID.generate();
 	}
 
-	public boolean isWait() {
-		return wait;
-	}
-
 	@Override
 	public String toString() {
 		return "Context Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism())
@@ -114,63 +90,18 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	public List<URL> getClasspaths(){
 		return classpathsToAttach;
 	}
+
+	public ClassLoader getUserCodeClassLoader() {
+		return userCodeClassLoader;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	
-	static void setAsContext(Client client, List<URL> jarFilesToAttach, List<URL> classpathsToAttach,
-				ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
-	{
-		ContextEnvironmentFactory factory = new ContextEnvironmentFactory(client, jarFilesToAttach,
-				classpathsToAttach, userCodeClassLoader, defaultParallelism, wait);
+	static void setAsContext(ContextEnvironmentFactory factory) {
 		initializeContextEnvironment(factory);
 	}
 	
 	static void unsetContext() {
 		resetContextEnvironment();
 	}
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * The factory that instantiates the environment to be used when running jobs that are
-	 * submitted through a pre-configured client connection.
-	 * This happens for example when a job is submitted from the command line.
-	 */
-	public static class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
-		
-		private final Client client;
-		
-		private final List<URL> jarFilesToAttach;
-
-		private final List<URL> classpathsToAttach;
-		
-		private final ClassLoader userCodeClassLoader;
-		
-		private final int defaultParallelism;
-
-		private final boolean wait;
-		
-
-		public ContextEnvironmentFactory(Client client, List<URL> jarFilesToAttach,
-				List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
-				boolean wait)
-		{
-			this.client = client;
-			this.jarFilesToAttach = jarFilesToAttach;
-			this.classpathsToAttach = classpathsToAttach;
-			this.userCodeClassLoader = userCodeClassLoader;
-			this.defaultParallelism = defaultParallelism;
-			this.wait = wait;
-		}
-		
-		@Override
-		public ExecutionEnvironment createExecutionEnvironment() {
-			ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach,
-					userCodeClassLoader, wait);
-			if (defaultParallelism > 0) {
-				env.setParallelism(defaultParallelism);
-			}
-			return env;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
new file mode 100644
index 0000000..55f705b
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+
+import java.net.URL;
+import java.util.List;
+
+/**
+ * The factory that instantiates the environment to be used when running jobs that are
+ * submitted through a pre-configured client connection.
+ * This happens for example when a job is submitted from the command line.
+ */
+public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
+
+	private final Client client;
+
+	private final List<URL> jarFilesToAttach;
+
+	private final List<URL> classpathsToAttach;
+
+	private final ClassLoader userCodeClassLoader;
+
+	private final int defaultParallelism;
+
+	private final boolean wait;
+
+	private ExecutionEnvironment lastEnvCreated;
+
+
+	public ContextEnvironmentFactory(Client client, List<URL> jarFilesToAttach,
+			List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
+			boolean wait)
+	{
+		this.client = client;
+		this.jarFilesToAttach = jarFilesToAttach;
+		this.classpathsToAttach = classpathsToAttach;
+		this.userCodeClassLoader = userCodeClassLoader;
+		this.defaultParallelism = defaultParallelism;
+		this.wait = wait;
+	}
+
+	@Override
+	public ExecutionEnvironment createExecutionEnvironment() {
+		if (!wait && lastEnvCreated != null) {
+			throw new InvalidProgramException("Multiple enviornments cannot be created in detached mode");
+		}
+
+		lastEnvCreated = wait ?
+				new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader) :
+				new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader);
+		if (defaultParallelism > 0) {
+			lastEnvCreated.setParallelism(defaultParallelism);
+		}
+		return lastEnvCreated;
+	}
+
+	public ExecutionEnvironment getLastEnvCreated() {
+		return lastEnvCreated;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
new file mode 100644
index 0000000..0b1ae1d
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Execution Environment for remote execution with the Client in detached mode.
+ */
+public class DetachedEnvironment extends ContextEnvironment {
+
+	/** Keeps track of the program plan for the Client to access. */
+	private FlinkPlan detachedPlan;
+
+	private static final Logger LOG = LoggerFactory.getLogger(DetachedEnvironment.class);
+
+	public DetachedEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths, ClassLoader userCodeClassLoader) {
+		super(remoteConnection, jarFiles, classpaths, userCodeClassLoader);
+	}
+
+	@Override
+	public JobExecutionResult execute(String jobName) throws Exception {
+		Plan p = createProgramPlan(jobName);
+		setDetachedPlan(Client.getOptimizedPlan(client.compiler, p, getParallelism()));
+		LOG.warn("Job was executed in detached mode, the results will be available on completion.");
+		this.lastJobExecutionResult = DetachedJobExecutionResult.INSTANCE;
+		return this.lastJobExecutionResult;
+	}
+
+	public void setDetachedPlan(FlinkPlan plan) {
+		if (detachedPlan == null) {
+			detachedPlan = plan;
+		} else {
+			throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE +
+					DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
+		}
+	}
+
+	/**
+	 * Finishes this Context Environment's execution by explicitly running the plan constructed.
+	 */
+	JobSubmissionResult finalizeExecute() throws ProgramInvocationException {
+		return client.runDetached(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader);
+	}
+
+	public static final class DetachedJobExecutionResult extends JobExecutionResult {
+
+		public static final DetachedJobExecutionResult INSTANCE = new DetachedJobExecutionResult();
+
+		static final String DETACHED_MESSAGE = "Job was submitted in detached mode. ";
+
+		static final String EXECUTE_TWICE_MESSAGE = "Only one call to execute is allowed. ";
+
+		static final String EAGER_FUNCTION_MESSAGE = "Please make sure your program doesn't call " +
+				"an eager execution function [collect, print, printToErr, count]. ";
+
+		static final String JOB_RESULT_MESSAGE = "Results of job execution, such as accumulators," +
+				" runtime, job id etc. are not available. ";
+
+		private DetachedJobExecutionResult() {
+			super(null, -1, null);
+		}
+
+		@Override
+		public long getNetRuntime() {
+			throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
+		}
+
+		@Override
+		public <T> T getAccumulatorResult(String accumulatorName) {
+			throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE + EAGER_FUNCTION_MESSAGE);
+		}
+
+		@Override
+		public Map<String, Object> getAllAccumulatorResults() {
+			throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
+		}
+
+		@Override
+		public Integer getIntCounterResult(String accumulatorName) {
+			throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
+		}
+
+		@Override
+		public JobID getJobID() {
+			throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 1fbf681..cc32d9c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.DataStatistics;
@@ -75,6 +76,9 @@ public class ClientTest {
 
 	private ActorSystem jobManagerSystem;
 
+	private static final String ACCUMULATOR_NAME = "test_accumulator";
+
+	private static final String FAIL_MESSAGE = "Invalid program should have thrown ProgramInvocationException.";
 
 	@Before
 	public void setUp() throws Exception {
@@ -118,6 +122,75 @@ public class ClientTest {
 	}
 
 	/**
+	 * Tests that invalid detached mode programs fail.
+	 */
+	@Test
+	public void testDetachedMode() throws Exception{
+		jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
+		Client out = new Client(config);
+
+		try {
+			PackagedProgram prg = new PackagedProgram(TestExecuteTwice.class);
+			out.runDetached(prg, 1);
+			fail(FAIL_MESSAGE);
+		} catch (ProgramInvocationException e) {
+			assertEquals(
+					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE,
+					e.getCause().getMessage());
+		}
+
+		try {
+			PackagedProgram prg = new PackagedProgram(TestEager.class);
+			out.runDetached(prg, 1);
+			fail(FAIL_MESSAGE);
+		} catch (ProgramInvocationException e) {
+			assertEquals(
+					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
+					e.getCause().getMessage());
+		}
+
+		try {
+			PackagedProgram prg = new PackagedProgram(TestGetRuntime.class);
+			out.runDetached(prg, 1);
+			fail(FAIL_MESSAGE);
+		} catch (ProgramInvocationException e) {
+			assertEquals(
+					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+					e.getCause().getMessage());
+		}
+
+		try {
+			PackagedProgram prg = new PackagedProgram(TestGetJobID.class);
+			out.runDetached(prg, 1);
+			fail(FAIL_MESSAGE);
+		} catch (ProgramInvocationException e) {
+			assertEquals(
+					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+					e.getCause().getMessage());
+		}
+
+		try {
+			PackagedProgram prg = new PackagedProgram(TestGetAccumulator.class);
+			out.runDetached(prg, 1);
+			fail(FAIL_MESSAGE);
+		} catch (ProgramInvocationException e) {
+			assertEquals(
+					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
+					e.getCause().getMessage());
+		}
+
+		try {
+			PackagedProgram prg = new PackagedProgram(TestGetAllAccumulator.class);
+			out.runDetached(prg, 1);
+			fail(FAIL_MESSAGE);
+		} catch (ProgramInvocationException e) {
+			assertEquals(
+					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+					e.getCause().getMessage());
+		}
+	}
+
+	/**
 	 * This test verifies correct job submission messaging logic and plan translation calls.
 	 */
 	@Test
@@ -304,4 +377,58 @@ public class ClientTest {
 			return "TestOptimizerPlan <input-file-path> <output-file-path>";
 		}
 	}
+
+	private static final class TestExecuteTwice {
+
+		public static void main(String args[]) throws Exception {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
+			env.execute();
+			env.fromElements(1, 2).collect();
+		}
+	}
+
+	private static final class TestEager {
+
+		public static void main(String args[]) throws Exception {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.fromElements(1, 2).collect();
+		}
+	}
+
+	private static final class TestGetRuntime {
+
+		public static void main(String args[]) throws Exception {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
+			env.execute().getNetRuntime();
+		}
+	}
+
+	private static final class TestGetJobID {
+
+		public static void main(String args[]) throws Exception {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
+			env.execute().getJobID();
+		}
+	}
+
+	private static final class TestGetAccumulator {
+
+		public static void main(String args[]) throws Exception {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
+			env.execute().getAccumulatorResult(ACCUMULATOR_NAME);
+		}
+	}
+
+	private static final class TestGetAllAccumulator {
+
+		public static void main(String args[]) throws Exception {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
+			env.execute().getAllAccumulatorResults();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 240c9d2..7a68fc5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -17,17 +17,13 @@
 
 package org.apache.flink.streaming.api.environment;
 
-import java.net.URL;
-import java.util.List;
-
 import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.client.program.DetachedEnvironment;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
-
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,31 +32,13 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
 
-	private final List<URL> jars;
+	private final ContextEnvironment ctx;
 
-	private final List<URL> classpaths;
-	
-	private final Client client;
-
-	private final ClassLoader userCodeClassLoader;
-	
-	private final boolean wait;
-
-	protected StreamContextEnvironment(Client client, List<URL> jars, List<URL> classpaths, int parallelism,
-			boolean wait) {
-		this.client = client;
-		this.jars = jars;
-		this.classpaths = classpaths;
-		this.wait = wait;
-		
-		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, classpaths,
-				getClass().getClassLoader());
-		
-		if (parallelism > 0) {
-			setParallelism(parallelism);
-		}
-		else {
-			// determine parallelism
+	protected StreamContextEnvironment(ContextEnvironment ctx) {
+		this.ctx = ctx;
+		if (ctx.getParallelism() > 0) {
+			setParallelism(ctx.getParallelism());
+		} else {
 			setParallelism(GlobalConfiguration.getInteger(
 					ConfigConstants.DEFAULT_PARALLELISM_KEY,
 					ConfigConstants.DEFAULT_PARALLELISM));
@@ -68,11 +46,6 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 	}
 
 	@Override
-	public JobExecutionResult execute() throws Exception {
-		return execute(DEFAULT_JOB_NAME);
-	}
-
-	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		Preconditions.checkNotNull("Streaming Job name should not be null.");
 
@@ -82,12 +55,12 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 		transformations.clear();
 
 		// execute the programs
-		if (wait) {
-			return client.runBlocking(streamGraph, jars, classpaths, userCodeClassLoader);
-		} else {
-			JobSubmissionResult result = client.runDetached(streamGraph, jars, classpaths, userCodeClassLoader);
+		if (ctx instanceof DetachedEnvironment) {
 			LOG.warn("Job was executed in detached mode, the results will be available on completion.");
-			return JobExecutionResult.fromJobSubmissionResult(result);
+			((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);
+			return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
+		} else {
+			return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3c961f0..5cc0007 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -39,7 +39,6 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
-import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PreviewPlanEnvironment;
@@ -70,7 +69,6 @@ import org.apache.flink.util.SplittableIterator;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1272,9 +1270,7 @@ public abstract class StreamExecutionEnvironment {
 		
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		if (env instanceof ContextEnvironment) {
-			ContextEnvironment ctx = (ContextEnvironment) env;
-			return createContextEnvironment(ctx.getClient(), ctx.getJars(), ctx.getClasspaths(),
-					ctx.getParallelism(), ctx.isWait());
+			return new StreamContextEnvironment((ContextEnvironment) env);
 		} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
 			return new StreamPlanEnvironment(env);
 		} else {
@@ -1282,12 +1278,6 @@ public abstract class StreamExecutionEnvironment {
 		}
 	}
 
-	private static StreamExecutionEnvironment createContextEnvironment(
-			Client client, List<URL> jars, List<URL> classpaths, int parallelism, boolean wait)
-	{
-		return new StreamContextEnvironment(client, jars, classpaths, parallelism, wait);
-	}
-
 	/**
 	 * Creates a {@link LocalStreamEnvironment}. The local execution environment
 	 * will run the program in a multi-threaded fashion in the same JVM as the


[2/2] flink git commit: [FLINK-2797][cli] Add support for running jobs in detached mode from CLI

Posted by mx...@apache.org.
[FLINK-2797][cli] Add support for running jobs in detached mode from CLI

This closes #1214.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7cf642b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7cf642b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7cf642b

Branch: refs/heads/master
Commit: b7cf642bca109dced7dc1a02e831405887d7a9a6
Parents: 30647a2
Author: Sachin Goel <sa...@gmail.com>
Authored: Fri Oct 2 18:11:09 2015 +0530
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Nov 13 16:28:38 2015 +0100

----------------------------------------------------------------------
 docs/apis/cli.md                                | 10 +++++--
 .../org/apache/flink/client/CliFrontend.java    | 29 +++++++++++++-------
 .../flink/client/cli/CliFrontendParser.java     |  6 ++++
 .../apache/flink/client/cli/ProgramOptions.java | 16 +++++++----
 .../apache/flink/client/CliFrontendRunTest.java | 21 ++++++++++----
 5 files changed, 59 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
index 6bd2352..dfa1baf 100644
--- a/docs/apis/cli.md
+++ b/docs/apis/cli.md
@@ -62,6 +62,10 @@ The command line can be used to
 
             ./bin/flink run -q ./examples/WordCount.jar
 
+-   Run example program in detached mode
+
+            ./bin/flink run -d ./examples/WordCount.jar
+
 -   Run example program on a specific JobManager:
 
         ./bin/flink run -m myJMHost:6123 \
@@ -128,14 +132,16 @@ Action "run" compiles and runs a program.
                                       program. Optional flag to override the
                                       default value specified in the
                                       configuration.
-     -q --sysoutLogging               Specfying this flag will disable log messages
+     -q --sysoutLogging               Specifying this flag will disable log messages
                                       being reported on the console. All messages
                                       however will still be logged by SLF4J loggers,
                                       regardless of this setting.
+     -d --detached                    Specifying this option will run the job in
+                                      detached mode.
 
   Additional arguments if -m yarn-cluster is set:
      -yD <arg>                            Dynamic properties
-     -yd,--yarndetached                   Start detached
+     -yd,--yarndetached                   Start detached [consider using -d flag above]
      -yj,--yarnjar <arg>                  Path to Flink jar file
      -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                           MB]

http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 933a22c..e93025f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -296,7 +296,7 @@ public class CliFrontend {
 			int userParallelism = options.getParallelism();
 			LOG.debug("User parallelism is set to {}", userParallelism);
 
-			Client client = getClient(options, program.getMainClassName(), userParallelism);
+			Client client = getClient(options, program.getMainClassName(), userParallelism, options.getDetachedMode());
 			client.setPrintStatusDuringExecution(options.getStdoutLogging());
 			LOG.debug("Client slots is set to {}", client.getMaxSlots());
 
@@ -307,16 +307,11 @@ public class CliFrontend {
 					userParallelism = client.getMaxSlots();
 				}
 
-				// check if detached per job yarn cluster is used to start flink
-				if (yarnCluster != null && yarnCluster.isDetached()) {
-					logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
-							"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-							"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
-							"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
+				// detached mode
+				if (options.getDetachedMode() || (yarnCluster != null && yarnCluster.isDetached())) {
 					exitCode = executeProgramDetached(program, client, userParallelism);
 				}
 				else {
-					// regular (blocking) execution.
 					exitCode = executeProgramBlocking(program, client, userParallelism);
 				}
 
@@ -638,6 +633,14 @@ public class CliFrontend {
 	// --------------------------------------------------------------------------------------------
 
 	protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+		// log message for detached yarn job
+		if (yarnCluster != null) {
+			logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
+					"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
+					"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
+					"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
+		}
+
 		LOG.info("Starting execution of program");
 
 		JobSubmissionResult result;
@@ -649,7 +652,7 @@ public class CliFrontend {
 			program.deleteExtractedLibraries();
 		}
 
-		if (yarnCluster != null && yarnCluster.isDetached()) {
+		if (yarnCluster != null) {
 			yarnCluster.stopAfterJob(result.getJobID());
 			yarnCluster.disconnect();
 		}
@@ -796,7 +799,8 @@ public class CliFrontend {
 	protected Client getClient(
 			CommandLineOptions options,
 			String programName,
-			int userParallelism)
+			int userParallelism,
+			boolean detachedMode)
 		throws Exception {
 		InetSocketAddress jobManagerAddress;
 		int maxSlots = -1;
@@ -811,6 +815,11 @@ public class CliFrontend {
 				throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
 			}
 			flinkYarnClient.setName("Flink Application: " + programName);
+			// in case the main detached mode wasn't set, we don't wanna overwrite the one loaded
+			// from yarn options.
+			if (detachedMode) {
+				flinkYarnClient.setDetachedMode(true);
+			}
 
 			// the number of slots available from YARN:
 			int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();

http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 028aead..1226d48 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -60,6 +60,9 @@ public class CliFrontendParser {
 	static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " +
 			"supress logging output to standard out.");
 
+	static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
+			"the job in detached mode");
+
 	static final Option ARGS_OPTION = new Option("a", "arguments", true,
 			"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
 
@@ -94,6 +97,7 @@ public class CliFrontendParser {
 		PARALLELISM_OPTION.setArgName("parallelism");
 
 		LOGGING_OPTION.setRequired(false);
+		DETACHED_OPTION.setRequired(false);
 
 		ARGS_OPTION.setRequired(false);
 		ARGS_OPTION.setArgName("programArgs");
@@ -123,6 +127,7 @@ public class CliFrontendParser {
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(ARGS_OPTION);
 		options.addOption(LOGGING_OPTION);
+		options.addOption(DETACHED_OPTION);
 
 		// also add the YARN options so that the parser can parse them
 		yarnSessionCLi.getYARNSessionCLIOptions(options);
@@ -134,6 +139,7 @@ public class CliFrontendParser {
 		options.addOption(CLASSPATH_OPTION);
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(LOGGING_OPTION);
+		options.addOption(DETACHED_OPTION);
 		return options;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 11382d2..499d3ca 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
@@ -49,6 +50,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 	private final boolean stdoutLogging;
 
+	private final boolean detachedMode;
+
 	protected ProgramOptions(CommandLine line) throws CliArgsException {
 		super(line);
 
@@ -100,11 +103,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 			parallelism = -1;
 		}
 
-		if(line.hasOption(LOGGING_OPTION.getOpt())){
-			stdoutLogging = false;
-		} else{
-			stdoutLogging = true;
-		}
+		stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
+		detachedMode = line.hasOption(DETACHED_OPTION.getOpt());
 	}
 
 	public String getJarFilePath() {
@@ -130,4 +130,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 	public boolean getStdoutLogging() {
 		return stdoutLogging;
 	}
-}
\ No newline at end of file
+
+	public boolean getDetachedMode() {
+		return detachedMode;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index f910312..64c2709 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -52,21 +52,28 @@ public class CliFrontendRunTest {
 			// test without parallelism
 			{
 				String[] parameters = {"-v", getTestJarPath()};
-				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true);
+				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true, false);
 				assertEquals(0, testFrontend.run(parameters));
 			}
 
 			// test configure parallelism
 			{
 				String[] parameters = {"-v", "-p", "42",  getTestJarPath()};
-				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(42, true);
+				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(42, true, false);
 				assertEquals(0, testFrontend.run(parameters));
 			}
 
 			// test configure sysout logging
 			{
 				String[] parameters = {"-p", "2", "-q", getTestJarPath()};
-				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false);
+				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, false);
+				assertEquals(0, testFrontend.run(parameters));
+			}
+
+			// test detached mode
+			{
+				String[] parameters = {"-p", "2", "-d", getTestJarPath()};
+				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, true);
 				assertEquals(0, testFrontend.run(parameters));
 			}
 
@@ -96,15 +103,18 @@ public class CliFrontendRunTest {
 		
 		private final int expectedParallelism;
 		private final boolean sysoutLogging;
+		private final boolean isDetached;
 		
-		public RunTestingCliFrontend(int expectedParallelism, boolean logging) throws Exception {
+		public RunTestingCliFrontend(int expectedParallelism, boolean logging, boolean isDetached) throws Exception {
 			super(CliFrontendTestUtils.getConfigDir());
 			this.expectedParallelism = expectedParallelism;
 			this.sysoutLogging = logging;
+			this.isDetached = isDetached;
 		}
 
 		@Override
 		protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+			assertTrue(isDetached);
 			assertEquals(this.expectedParallelism, parallelism);
 			assertEquals(this.sysoutLogging, client.getPrintStatusDuringExecution());
 			return 0;
@@ -112,11 +122,12 @@ public class CliFrontendRunTest {
 
 		@Override
 		protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
+			assertTrue(!isDetached);
 			return 0;
 		}
 
 		@Override
-		protected Client getClient(CommandLineOptions options, String programName, int userParallelism) throws Exception {
+		protected Client getClient(CommandLineOptions options, String programName, int userParallelism, boolean detached) throws Exception {
 			return Mockito.mock(Client.class);
 		}
 	}