You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/11/13 16:52:22 UTC
[1/2] flink git commit: [FLINK-2850] Limit the types of jobs which
can run in detached mode
Repository: flink
Updated Branches:
refs/heads/master 30647a2e6 -> 00e44eda1
[FLINK-2850] Limit the types of jobs which can run in detached mode
This disallows the following types of interactive programs in detached
mode:
1. More than one call to execute
2. Accessing job execution results like
accumulators, net runtime, etc. This effectively disables eager
execution functions such as count, print, collect, etc. too
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00e44eda
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00e44eda
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00e44eda
Branch: refs/heads/master
Commit: 00e44eda17052c4b3de4f9690c250a45df6f407f
Parents: b7cf642
Author: Sachin Goel <sa...@gmail.com>
Authored: Mon Nov 9 16:54:15 2015 +0530
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Nov 13 16:28:38 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 14 +-
.../org/apache/flink/client/program/Client.java | 13 +-
.../client/program/ContextEnvironment.java | 95 ++------------
.../program/ContextEnvironmentFactory.java | 80 ++++++++++++
.../client/program/DetachedEnvironment.java | 116 +++++++++++++++++
.../apache/flink/client/program/ClientTest.java | 127 +++++++++++++++++++
.../environment/StreamContextEnvironment.java | 55 ++------
.../environment/StreamExecutionEnvironment.java | 12 +-
8 files changed, 370 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index e93025f..ab3af85 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -40,6 +40,7 @@ import java.util.Properties;
import akka.actor.ActorSystem;
import org.apache.commons.cli.CommandLine;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -920,7 +921,18 @@ public class CliFrontend {
System.err.println("\n------------------------------------------------------------");
System.err.println(" The program finished with the following exception:\n");
- t.printStackTrace();
+ if (t.getCause() instanceof InvalidProgramException) {
+ System.err.println(t.getCause().getMessage());
+ StackTraceElement[] trace = t.getCause().getStackTrace();
+ for (StackTraceElement ele: trace) {
+ System.err.println("\t" + ele.toString());
+ if (ele.getMethodName().equals("main")) {
+ break;
+ }
+ }
+ } else {
+ t.printStackTrace();
+ }
return 1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 7d226d2..8f92c51 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -242,9 +242,8 @@ public class Client {
}
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
- ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(),
- prog.getUserCodeClassLoader(), parallelism, true);
-
+ ContextEnvironment.setAsContext(new ContextEnvironmentFactory(this, prog.getAllLibraries(),
+ prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, true));
// invoke here
try {
prog.invokeInteractiveModeForExecution();
@@ -269,18 +268,18 @@ public class Client {
}
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
- ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getClasspaths(),
- prog.getUserCodeClassLoader(), parallelism, false);
+ ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(),
+ prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, false);
+ ContextEnvironment.setAsContext(factory);
// invoke here
try {
prog.invokeInteractiveModeForExecution();
+ return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
}
finally {
ContextEnvironment.unsetContext();
}
-
- return new JobSubmissionResult(lastJobID);
}
else {
throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");
http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index d5a28fc..1e3d0d4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -23,41 +23,30 @@ import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Execution Environment for remote execution with the Client.
+ * Execution Environment for remote execution with the Client in blocking fashion.
*/
public class ContextEnvironment extends ExecutionEnvironment {
- private static final Logger LOG = LoggerFactory.getLogger(ContextEnvironment.class);
+ protected final Client client;
- private final Client client;
+ protected final List<URL> jarFilesToAttach;
- private final List<URL> jarFilesToAttach;
-
- private final List<URL> classpathsToAttach;
-
- private final ClassLoader userCodeClassLoader;
-
- private final boolean wait;
-
+ protected final List<URL> classpathsToAttach;
+ protected final ClassLoader userCodeClassLoader;
public ContextEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths,
- ClassLoader userCodeClassLoader, boolean wait) {
+ ClassLoader userCodeClassLoader) {
this.client = remoteConnection;
this.jarFilesToAttach = jarFiles;
this.classpathsToAttach = classpaths;
this.userCodeClassLoader = userCodeClassLoader;
- this.wait = wait;
}
@Override
@@ -65,17 +54,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
Plan p = createProgramPlan(jobName);
JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach,
this.userCodeClassLoader);
-
- if (wait) {
- this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
- return this.lastJobExecutionResult;
- }
- else {
- JobSubmissionResult result = client.runDetached(toRun, getParallelism());
- LOG.warn("Job was executed in detached mode, the results will be available on completion.");
- this.lastJobExecutionResult = JobExecutionResult.fromJobSubmissionResult(result);
- return this.lastJobExecutionResult;
- }
+ this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
+ return this.lastJobExecutionResult;
}
@Override
@@ -93,10 +73,6 @@ public class ContextEnvironment extends ExecutionEnvironment {
jobID = JobID.generate();
}
- public boolean isWait() {
- return wait;
- }
-
@Override
public String toString() {
return "Context Environment (parallelism = " + (getParallelism() == -1 ? "default" : getParallelism())
@@ -114,63 +90,18 @@ public class ContextEnvironment extends ExecutionEnvironment {
public List<URL> getClasspaths(){
return classpathsToAttach;
}
+
+ public ClassLoader getUserCodeClassLoader() {
+ return userCodeClassLoader;
+ }
// --------------------------------------------------------------------------------------------
- static void setAsContext(Client client, List<URL> jarFilesToAttach, List<URL> classpathsToAttach,
- ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
- {
- ContextEnvironmentFactory factory = new ContextEnvironmentFactory(client, jarFilesToAttach,
- classpathsToAttach, userCodeClassLoader, defaultParallelism, wait);
+ static void setAsContext(ContextEnvironmentFactory factory) {
initializeContextEnvironment(factory);
}
static void unsetContext() {
resetContextEnvironment();
}
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * The factory that instantiates the environment to be used when running jobs that are
- * submitted through a pre-configured client connection.
- * This happens for example when a job is submitted from the command line.
- */
- public static class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
-
- private final Client client;
-
- private final List<URL> jarFilesToAttach;
-
- private final List<URL> classpathsToAttach;
-
- private final ClassLoader userCodeClassLoader;
-
- private final int defaultParallelism;
-
- private final boolean wait;
-
-
- public ContextEnvironmentFactory(Client client, List<URL> jarFilesToAttach,
- List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
- boolean wait)
- {
- this.client = client;
- this.jarFilesToAttach = jarFilesToAttach;
- this.classpathsToAttach = classpathsToAttach;
- this.userCodeClassLoader = userCodeClassLoader;
- this.defaultParallelism = defaultParallelism;
- this.wait = wait;
- }
-
- @Override
- public ExecutionEnvironment createExecutionEnvironment() {
- ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach,
- userCodeClassLoader, wait);
- if (defaultParallelism > 0) {
- env.setParallelism(defaultParallelism);
- }
- return env;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
new file mode 100644
index 0000000..55f705b
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+
+import java.net.URL;
+import java.util.List;
+
+/**
+ * The factory that instantiates the environment to be used when running jobs that are
+ * submitted through a pre-configured client connection.
+ * This happens for example when a job is submitted from the command line.
+ */
+public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
+
+ private final Client client;
+
+ private final List<URL> jarFilesToAttach;
+
+ private final List<URL> classpathsToAttach;
+
+ private final ClassLoader userCodeClassLoader;
+
+ private final int defaultParallelism;
+
+ private final boolean wait;
+
+ private ExecutionEnvironment lastEnvCreated;
+
+
+ public ContextEnvironmentFactory(Client client, List<URL> jarFilesToAttach,
+ List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
+ boolean wait)
+ {
+ this.client = client;
+ this.jarFilesToAttach = jarFilesToAttach;
+ this.classpathsToAttach = classpathsToAttach;
+ this.userCodeClassLoader = userCodeClassLoader;
+ this.defaultParallelism = defaultParallelism;
+ this.wait = wait;
+ }
+
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ if (!wait && lastEnvCreated != null) {
+ throw new InvalidProgramException("Multiple enviornments cannot be created in detached mode");
+ }
+
+ lastEnvCreated = wait ?
+ new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader) :
+ new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader);
+ if (defaultParallelism > 0) {
+ lastEnvCreated.setParallelism(defaultParallelism);
+ }
+ return lastEnvCreated;
+ }
+
+ public ExecutionEnvironment getLastEnvCreated() {
+ return lastEnvCreated;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
new file mode 100644
index 0000000..0b1ae1d
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Execution Environment for remote execution with the Client in detached mode.
+ */
+public class DetachedEnvironment extends ContextEnvironment {
+
+ /** Keeps track of the program plan for the Client to access. */
+ private FlinkPlan detachedPlan;
+
+ private static final Logger LOG = LoggerFactory.getLogger(DetachedEnvironment.class);
+
+ public DetachedEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths, ClassLoader userCodeClassLoader) {
+ super(remoteConnection, jarFiles, classpaths, userCodeClassLoader);
+ }
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ Plan p = createProgramPlan(jobName);
+ setDetachedPlan(Client.getOptimizedPlan(client.compiler, p, getParallelism()));
+ LOG.warn("Job was executed in detached mode, the results will be available on completion.");
+ this.lastJobExecutionResult = DetachedJobExecutionResult.INSTANCE;
+ return this.lastJobExecutionResult;
+ }
+
+ public void setDetachedPlan(FlinkPlan plan) {
+ if (detachedPlan == null) {
+ detachedPlan = plan;
+ } else {
+ throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE +
+ DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
+ }
+ }
+
+ /**
+ * Finishes this Context Environment's execution by explicitly running the plan constructed.
+ */
+ JobSubmissionResult finalizeExecute() throws ProgramInvocationException {
+ return client.runDetached(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader);
+ }
+
+ public static final class DetachedJobExecutionResult extends JobExecutionResult {
+
+ public static final DetachedJobExecutionResult INSTANCE = new DetachedJobExecutionResult();
+
+ static final String DETACHED_MESSAGE = "Job was submitted in detached mode. ";
+
+ static final String EXECUTE_TWICE_MESSAGE = "Only one call to execute is allowed. ";
+
+ static final String EAGER_FUNCTION_MESSAGE = "Please make sure your program doesn't call " +
+ "an eager execution function [collect, print, printToErr, count]. ";
+
+ static final String JOB_RESULT_MESSAGE = "Results of job execution, such as accumulators," +
+ " runtime, job id etc. are not available. ";
+
+ private DetachedJobExecutionResult() {
+ super(null, -1, null);
+ }
+
+ @Override
+ public long getNetRuntime() {
+ throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
+ }
+
+ @Override
+ public <T> T getAccumulatorResult(String accumulatorName) {
+ throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE + EAGER_FUNCTION_MESSAGE);
+ }
+
+ @Override
+ public Map<String, Object> getAllAccumulatorResults() {
+ throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
+ }
+
+ @Override
+ public Integer getIntCounterResult(String accumulatorName) {
+ throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
+ }
+
+ @Override
+ public JobID getJobID() {
+ throw new InvalidProgramException(DETACHED_MESSAGE + JOB_RESULT_MESSAGE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 1fbf681..cc32d9c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.DetachedEnvironment.DetachedJobExecutionResult;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
@@ -75,6 +76,9 @@ public class ClientTest {
private ActorSystem jobManagerSystem;
+ private static final String ACCUMULATOR_NAME = "test_accumulator";
+
+ private static final String FAIL_MESSAGE = "Invalid program should have thrown ProgramInvocationException.";
@Before
public void setUp() throws Exception {
@@ -118,6 +122,75 @@ public class ClientTest {
}
/**
+ * Tests that invalid detached mode programs fail.
+ */
+ @Test
+ public void testDetachedMode() throws Exception{
+ jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
+ Client out = new Client(config);
+
+ try {
+ PackagedProgram prg = new PackagedProgram(TestExecuteTwice.class);
+ out.runDetached(prg, 1);
+ fail(FAIL_MESSAGE);
+ } catch (ProgramInvocationException e) {
+ assertEquals(
+ DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE,
+ e.getCause().getMessage());
+ }
+
+ try {
+ PackagedProgram prg = new PackagedProgram(TestEager.class);
+ out.runDetached(prg, 1);
+ fail(FAIL_MESSAGE);
+ } catch (ProgramInvocationException e) {
+ assertEquals(
+ DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
+ e.getCause().getMessage());
+ }
+
+ try {
+ PackagedProgram prg = new PackagedProgram(TestGetRuntime.class);
+ out.runDetached(prg, 1);
+ fail(FAIL_MESSAGE);
+ } catch (ProgramInvocationException e) {
+ assertEquals(
+ DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+ e.getCause().getMessage());
+ }
+
+ try {
+ PackagedProgram prg = new PackagedProgram(TestGetJobID.class);
+ out.runDetached(prg, 1);
+ fail(FAIL_MESSAGE);
+ } catch (ProgramInvocationException e) {
+ assertEquals(
+ DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+ e.getCause().getMessage());
+ }
+
+ try {
+ PackagedProgram prg = new PackagedProgram(TestGetAccumulator.class);
+ out.runDetached(prg, 1);
+ fail(FAIL_MESSAGE);
+ } catch (ProgramInvocationException e) {
+ assertEquals(
+ DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
+ e.getCause().getMessage());
+ }
+
+ try {
+ PackagedProgram prg = new PackagedProgram(TestGetAllAccumulator.class);
+ out.runDetached(prg, 1);
+ fail(FAIL_MESSAGE);
+ } catch (ProgramInvocationException e) {
+ assertEquals(
+ DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+ e.getCause().getMessage());
+ }
+ }
+
+ /**
* This test verifies correct job submission messaging logic and plan translation calls.
*/
@Test
@@ -304,4 +377,58 @@ public class ClientTest {
return "TestOptimizerPlan <input-file-path> <output-file-path>";
}
}
+
+ private static final class TestExecuteTwice {
+
+ public static void main(String args[]) throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
+ env.execute();
+ env.fromElements(1, 2).collect();
+ }
+ }
+
+ private static final class TestEager {
+
+ public static void main(String args[]) throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(1, 2).collect();
+ }
+ }
+
+ private static final class TestGetRuntime {
+
+ public static void main(String args[]) throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
+ env.execute().getNetRuntime();
+ }
+ }
+
+ private static final class TestGetJobID {
+
+ public static void main(String args[]) throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
+ env.execute().getJobID();
+ }
+ }
+
+ private static final class TestGetAccumulator {
+
+ public static void main(String args[]) throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
+ env.execute().getAccumulatorResult(ACCUMULATOR_NAME);
+ }
+ }
+
+ private static final class TestGetAllAccumulator {
+
+ public static void main(String args[]) throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
+ env.execute().getAllAccumulatorResults();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 240c9d2..7a68fc5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -17,17 +17,13 @@
package org.apache.flink.streaming.api.environment;
-import java.net.URL;
-import java.util.List;
-
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.client.program.DetachedEnvironment;
+
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
-
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,31 +32,13 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
- private final List<URL> jars;
+ private final ContextEnvironment ctx;
- private final List<URL> classpaths;
-
- private final Client client;
-
- private final ClassLoader userCodeClassLoader;
-
- private final boolean wait;
-
- protected StreamContextEnvironment(Client client, List<URL> jars, List<URL> classpaths, int parallelism,
- boolean wait) {
- this.client = client;
- this.jars = jars;
- this.classpaths = classpaths;
- this.wait = wait;
-
- this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, classpaths,
- getClass().getClassLoader());
-
- if (parallelism > 0) {
- setParallelism(parallelism);
- }
- else {
- // determine parallelism
+ protected StreamContextEnvironment(ContextEnvironment ctx) {
+ this.ctx = ctx;
+ if (ctx.getParallelism() > 0) {
+ setParallelism(ctx.getParallelism());
+ } else {
setParallelism(GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM));
@@ -68,11 +46,6 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
}
@Override
- public JobExecutionResult execute() throws Exception {
- return execute(DEFAULT_JOB_NAME);
- }
-
- @Override
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull("Streaming Job name should not be null.");
@@ -82,12 +55,12 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
transformations.clear();
// execute the programs
- if (wait) {
- return client.runBlocking(streamGraph, jars, classpaths, userCodeClassLoader);
- } else {
- JobSubmissionResult result = client.runDetached(streamGraph, jars, classpaths, userCodeClassLoader);
+ if (ctx instanceof DetachedEnvironment) {
LOG.warn("Job was executed in detached mode, the results will be available on completion.");
- return JobExecutionResult.fromJobSubmissionResult(result);
+ ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);
+ return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
+ } else {
+ return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/00e44eda/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3c961f0..5cc0007 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -39,7 +39,6 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
-import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PreviewPlanEnvironment;
@@ -70,7 +69,6 @@ import org.apache.flink.util.SplittableIterator;
import java.io.IOException;
import java.io.Serializable;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -1272,9 +1270,7 @@ public abstract class StreamExecutionEnvironment {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (env instanceof ContextEnvironment) {
- ContextEnvironment ctx = (ContextEnvironment) env;
- return createContextEnvironment(ctx.getClient(), ctx.getJars(), ctx.getClasspaths(),
- ctx.getParallelism(), ctx.isWait());
+ return new StreamContextEnvironment((ContextEnvironment) env);
} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
return new StreamPlanEnvironment(env);
} else {
@@ -1282,12 +1278,6 @@ public abstract class StreamExecutionEnvironment {
}
}
- private static StreamExecutionEnvironment createContextEnvironment(
- Client client, List<URL> jars, List<URL> classpaths, int parallelism, boolean wait)
- {
- return new StreamContextEnvironment(client, jars, classpaths, parallelism, wait);
- }
-
/**
* Creates a {@link LocalStreamEnvironment}. The local execution environment
* will run the program in a multi-threaded fashion in the same JVM as the
[2/2] flink git commit: [FLINK-2797][cli] Add support for running
jobs in detached mode from CLI
Posted by mx...@apache.org.
[FLINK-2797][cli] Add support for running jobs in detached mode from CLI
This closes #1214.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7cf642b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7cf642b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7cf642b
Branch: refs/heads/master
Commit: b7cf642bca109dced7dc1a02e831405887d7a9a6
Parents: 30647a2
Author: Sachin Goel <sa...@gmail.com>
Authored: Fri Oct 2 18:11:09 2015 +0530
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Nov 13 16:28:38 2015 +0100
----------------------------------------------------------------------
docs/apis/cli.md | 10 +++++--
.../org/apache/flink/client/CliFrontend.java | 29 +++++++++++++-------
.../flink/client/cli/CliFrontendParser.java | 6 ++++
.../apache/flink/client/cli/ProgramOptions.java | 16 +++++++----
.../apache/flink/client/CliFrontendRunTest.java | 21 ++++++++++----
5 files changed, 59 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
index 6bd2352..dfa1baf 100644
--- a/docs/apis/cli.md
+++ b/docs/apis/cli.md
@@ -62,6 +62,10 @@ The command line can be used to
./bin/flink run -q ./examples/WordCount.jar
+- Run example program in detached mode
+
+ ./bin/flink run -d ./examples/WordCount.jar
+
- Run example program on a specific JobManager:
./bin/flink run -m myJMHost:6123 \
@@ -128,14 +132,16 @@ Action "run" compiles and runs a program.
program. Optional flag to override the
default value specified in the
configuration.
- -q --sysoutLogging Specfying this flag will disable log messages
+ -q --sysoutLogging Specifying this flag will disable log messages
being reported on the console. All messages
however will still be logged by SLF4J loggers,
regardless of this setting.
+ -d --detached Specifying this option will run the job in
+ detached mode.
Additional arguments if -m yarn-cluster is set:
-yD <arg> Dynamic properties
- -yd,--yarndetached Start detached
+ -yd,--yarndetached Start detached [consider using -d flag above]
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container [in
MB]
http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 933a22c..e93025f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -296,7 +296,7 @@ public class CliFrontend {
int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
- Client client = getClient(options, program.getMainClassName(), userParallelism);
+ Client client = getClient(options, program.getMainClassName(), userParallelism, options.getDetachedMode());
client.setPrintStatusDuringExecution(options.getStdoutLogging());
LOG.debug("Client slots is set to {}", client.getMaxSlots());
@@ -307,16 +307,11 @@ public class CliFrontend {
userParallelism = client.getMaxSlots();
}
- // check if detached per job yarn cluster is used to start flink
- if (yarnCluster != null && yarnCluster.isDetached()) {
- logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
- "Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
- "yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
- "Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
+ // detached mode
+ if (options.getDetachedMode() || (yarnCluster != null && yarnCluster.isDetached())) {
exitCode = executeProgramDetached(program, client, userParallelism);
}
else {
- // regular (blocking) execution.
exitCode = executeProgramBlocking(program, client, userParallelism);
}
@@ -638,6 +633,14 @@ public class CliFrontend {
// --------------------------------------------------------------------------------------------
protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+ // log message for detached yarn job
+ if (yarnCluster != null) {
+ logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
+ "Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
+ "yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
+ "Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
+ }
+
LOG.info("Starting execution of program");
JobSubmissionResult result;
@@ -649,7 +652,7 @@ public class CliFrontend {
program.deleteExtractedLibraries();
}
- if (yarnCluster != null && yarnCluster.isDetached()) {
+ if (yarnCluster != null) {
yarnCluster.stopAfterJob(result.getJobID());
yarnCluster.disconnect();
}
@@ -796,7 +799,8 @@ public class CliFrontend {
protected Client getClient(
CommandLineOptions options,
String programName,
- int userParallelism)
+ int userParallelism,
+ boolean detachedMode)
throws Exception {
InetSocketAddress jobManagerAddress;
int maxSlots = -1;
@@ -811,6 +815,11 @@ public class CliFrontend {
throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
}
flinkYarnClient.setName("Flink Application: " + programName);
+ // in case the main detached mode wasn't set, we don't wanna overwrite the one loaded
+ // from yarn options.
+ if (detachedMode) {
+ flinkYarnClient.setDetachedMode(true);
+ }
// the number of slots available from YARN:
int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 028aead..1226d48 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -60,6 +60,9 @@ public class CliFrontendParser {
static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " +
"supress logging output to standard out.");
+ static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
+ "the job in detached mode");
+
static final Option ARGS_OPTION = new Option("a", "arguments", true,
"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
@@ -94,6 +97,7 @@ public class CliFrontendParser {
PARALLELISM_OPTION.setArgName("parallelism");
LOGGING_OPTION.setRequired(false);
+ DETACHED_OPTION.setRequired(false);
ARGS_OPTION.setRequired(false);
ARGS_OPTION.setArgName("programArgs");
@@ -123,6 +127,7 @@ public class CliFrontendParser {
options.addOption(PARALLELISM_OPTION);
options.addOption(ARGS_OPTION);
options.addOption(LOGGING_OPTION);
+ options.addOption(DETACHED_OPTION);
// also add the YARN options so that the parser can parse them
yarnSessionCLi.getYARNSessionCLIOptions(options);
@@ -134,6 +139,7 @@ public class CliFrontendParser {
options.addOption(CLASSPATH_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(LOGGING_OPTION);
+ options.addOption(DETACHED_OPTION);
return options;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 11382d2..499d3ca 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
@@ -49,6 +50,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
private final boolean stdoutLogging;
+ private final boolean detachedMode;
+
protected ProgramOptions(CommandLine line) throws CliArgsException {
super(line);
@@ -100,11 +103,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
parallelism = -1;
}
- if(line.hasOption(LOGGING_OPTION.getOpt())){
- stdoutLogging = false;
- } else{
- stdoutLogging = true;
- }
+ stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
+ detachedMode = line.hasOption(DETACHED_OPTION.getOpt());
}
public String getJarFilePath() {
@@ -130,4 +130,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
public boolean getStdoutLogging() {
return stdoutLogging;
}
-}
\ No newline at end of file
+
+ public boolean getDetachedMode() {
+ return detachedMode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7cf642b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index f910312..64c2709 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -52,21 +52,28 @@ public class CliFrontendRunTest {
// test without parallelism
{
String[] parameters = {"-v", getTestJarPath()};
- RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true);
+ RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true, false);
assertEquals(0, testFrontend.run(parameters));
}
// test configure parallelism
{
String[] parameters = {"-v", "-p", "42", getTestJarPath()};
- RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(42, true);
+ RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(42, true, false);
assertEquals(0, testFrontend.run(parameters));
}
// test configure sysout logging
{
String[] parameters = {"-p", "2", "-q", getTestJarPath()};
- RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false);
+ RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, false);
+ assertEquals(0, testFrontend.run(parameters));
+ }
+
+ // test detached mode
+ {
+ String[] parameters = {"-p", "2", "-d", getTestJarPath()};
+ RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, true);
assertEquals(0, testFrontend.run(parameters));
}
@@ -96,15 +103,18 @@ public class CliFrontendRunTest {
private final int expectedParallelism;
private final boolean sysoutLogging;
+ private final boolean isDetached;
- public RunTestingCliFrontend(int expectedParallelism, boolean logging) throws Exception {
+ public RunTestingCliFrontend(int expectedParallelism, boolean logging, boolean isDetached) throws Exception {
super(CliFrontendTestUtils.getConfigDir());
this.expectedParallelism = expectedParallelism;
this.sysoutLogging = logging;
+ this.isDetached = isDetached;
}
@Override
protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+ assertTrue(isDetached);
assertEquals(this.expectedParallelism, parallelism);
assertEquals(this.sysoutLogging, client.getPrintStatusDuringExecution());
return 0;
@@ -112,11 +122,12 @@ public class CliFrontendRunTest {
@Override
protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
+ assertTrue(!isDetached);
return 0;
}
@Override
- protected Client getClient(CommandLineOptions options, String programName, int userParallelism) throws Exception {
+ protected Client getClient(CommandLineOptions options, String programName, int userParallelism, boolean detached) throws Exception {
return Mockito.mock(Client.class);
}
}