You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/09/22 21:57:20 UTC
[4/4] flink git commit: [FLINK-2097][core] implement a job session
management
[FLINK-2097][core] implement a job session management
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.
This pull request implements a rudimentary session management. Together
with the backtracking #640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.
ExecutionGraphs are kept as long as
- no timeout occurred or
- the session has not been explicitly ended
The following changes have also been made in this pull request:
- The Job ID is created through the ExecutionEnvironment and passed through
- Sessions can be termined by the ExecutionEnvironment or directly
through the executor
- The environments use reapers (local) and shutdown hooks (remote) to
ensure session termination when the environment runs out of scope
- The Client manages only connections to the JobManager, it is not job
specific
This closes #858.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71bf2f57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71bf2f57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71bf2f57
Branch: refs/heads/master
Commit: 71bf2f570861daae53b24bfcf1d06aedb85311b9
Parents: 7984acc
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Sep 4 17:34:44 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 22 19:55:46 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 182 +++---
.../org/apache/flink/client/LocalExecutor.java | 222 ++++---
.../org/apache/flink/client/RemoteExecutor.java | 188 ++++--
.../org/apache/flink/client/program/Client.java | 653 +++++++++----------
.../client/program/ContextEnvironment.java | 40 +-
.../flink/client/program/JobWithJars.java | 5 +-
.../program/OptimizerPlanEnvironment.java | 132 ++++
.../flink/client/program/PackagedProgram.java | 51 +-
.../client/program/PreviewPlanEnvironment.java | 80 +++
.../flink/client/web/JobSubmissionServlet.java | 30 +-
.../flink/client/CliFrontendInfoTest.java | 81 +--
.../client/CliFrontendPackageProgramTest.java | 10 +-
.../apache/flink/client/CliFrontendRunTest.java | 30 +-
.../RemoteExecutorHostnameResolutionTest.java | 6 +-
.../client/program/ClientConnectionTest.java | 8 +-
.../apache/flink/client/program/ClientTest.java | 142 ++--
.../ExecutionPlanAfterExecutionTest.java | 15 +-
.../program/ExecutionPlanCreationTest.java | 9 +-
.../client/program/PackagedProgramTest.java | 1 -
.../stormcompatibility/api/FlinkClient.java | 22 +-
.../flink/api/common/JobExecutionResult.java | 17 +-
.../java/org/apache/flink/api/common/JobID.java | 46 +-
.../flink/api/common/JobSubmissionResult.java | 5 +-
.../java/org/apache/flink/api/common/Plan.java | 71 +-
.../apache/flink/api/common/PlanExecutor.java | 85 ++-
.../flink/api/java/CollectionEnvironment.java | 4 +
.../flink/api/java/ExecutionEnvironment.java | 97 ++-
.../apache/flink/api/java/LocalEnvironment.java | 180 ++++-
.../flink/api/java/RemoteEnvironment.java | 153 ++++-
.../flink/optimizer/plan/OptimizedPlan.java | 15 +-
.../plantranslate/JobGraphGenerator.java | 24 +-
.../optimizer/postpass/JavaApiPostPass.java | 2 +-
.../apache/flink/runtime/client/JobClient.java | 11 +-
.../runtime/client/JobExecutionException.java | 2 +-
.../runtime/executiongraph/ExecutionGraph.java | 8 +
.../apache/flink/runtime/jobgraph/JobGraph.java | 58 +-
.../runtime/taskmanager/TaskExecutionState.java | 2 +-
.../flink/runtime/jobmanager/JobInfo.scala | 18 +-
.../flink/runtime/jobmanager/JobManager.scala | 98 ++-
.../runtime/jobmanager/MemoryArchivist.scala | 11 +-
.../runtime/messages/JobManagerMessages.scala | 6 +
.../runtime/minicluster/FlinkMiniCluster.scala | 9 +-
.../PartialConsumePipelinedResultTest.java | 3 +-
.../TaskManagerProcessReapingTest.java | 2 +-
.../runtime/jobmanager/JobManagerITCase.scala | 126 +++-
.../flink/api/scala/ExecutionEnvironment.scala | 31 +-
.../api/avro/AvroExternalJarProgramITCase.java | 38 +-
.../environment/RemoteStreamEnvironment.java | 24 +-
.../environment/StreamContextEnvironment.java | 28 +-
.../environment/StreamExecutionEnvironment.java | 11 +-
.../api/environment/StreamPlanEnvironment.java | 7 +-
.../flink/tez/client/LocalTezEnvironment.java | 5 +
.../flink/tez/client/RemoteTezEnvironment.java | 5 +
.../apache/flink/tez/client/TezExecutor.java | 21 +
.../apache/flink/test/util/TestEnvironment.java | 4 +
.../clients/examples/LocalExecutorITCase.java | 3 +-
.../RemoteEnvironmentITCase.java | 2 +-
.../jsonplan/DumpCompiledPlanTest.java | 6 +-
.../jsonplan/JsonJobGraphGenerationTest.java | 4 +
.../jobmanager/JobManagerFailsITCase.scala | 1 -
.../org/apache/flink/yarn/YarnTestBase.java | 2 +-
.../org/apache/flink/yarn/FlinkYarnCluster.java | 2 +
62 files changed, 2114 insertions(+), 1040 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index fc4d98a..f0e6c4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -57,10 +57,13 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -297,44 +300,51 @@ public class CliFrontend {
int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);
- Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
+ Client client = getClient(options, program.getMainClassName(), userParallelism);
client.setPrintStatusDuringExecution(options.getStdoutLogging());
LOG.debug("Client slots is set to {}", client.getMaxSlots());
- if(client.getMaxSlots() != -1 && userParallelism == -1) {
- logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
- "To use another parallelism, set it at the ./bin/flink client.");
- userParallelism = client.getMaxSlots();
- }
- // check if detached per job yarn cluster is used to start flink
- if(yarnCluster != null && yarnCluster.isDetached()) {
- logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
- "Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
- "yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
- "Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
- exitCode = executeProgram(program, client, userParallelism, false);
- } else {
- // regular (blocking) execution.
- exitCode = executeProgram(program, client, userParallelism, true);
- }
+ try {
+ if (client.getMaxSlots() != -1 && userParallelism == -1) {
+ logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
+ "To use another parallelism, set it at the ./bin/flink client.");
+ userParallelism = client.getMaxSlots();
+ }
- // show YARN cluster status if its not a detached YARN cluster.
- if (yarnCluster != null && !yarnCluster.isDetached()) {
- List<String> msgs = yarnCluster.getNewMessages();
- if (msgs != null && msgs.size() > 1) {
+ // check if detached per job yarn cluster is used to start flink
+ if (yarnCluster != null && yarnCluster.isDetached()) {
+ logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
+ "Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
+ "yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
+ "Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
+ exitCode = executeProgramDetached(program, client, userParallelism);
+ }
+ else {
+ // regular (blocking) execution.
+ exitCode = executeProgramBlocking(program, client, userParallelism);
+ }
+
+ // show YARN cluster status if its not a detached YARN cluster.
+ if (yarnCluster != null && !yarnCluster.isDetached()) {
+ List<String> msgs = yarnCluster.getNewMessages();
+ if (msgs != null && msgs.size() > 1) {
- logAndSysout("The following messages were created by the YARN cluster while running the Job:");
- for (String msg : msgs) {
- logAndSysout(msg);
+ logAndSysout("The following messages were created by the YARN cluster while running the Job:");
+ for (String msg : msgs) {
+ logAndSysout(msg);
+ }
+ }
+ if (yarnCluster.hasFailed()) {
+ logAndSysout("YARN cluster is in failed state!");
+ logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
}
}
- if (yarnCluster.hasFailed()) {
- logAndSysout("YARN cluster is in failed state!");
- logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
- }
- }
- return exitCode;
+ return exitCode;
+ }
+ finally {
+ client.shutdown();
+ }
}
catch (Throwable t) {
return handleError(t);
@@ -395,8 +405,10 @@ public class CliFrontend {
int parallelism = options.getParallelism();
LOG.info("Creating program plan dump");
- Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), parallelism);
- FlinkPlan flinkPlan = client.getOptimizedPlan(program, parallelism);
+
+ Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
+
+ FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism);
if (webFrontend) {
this.optimizedPlan = flinkPlan;
@@ -425,6 +437,8 @@ public class CliFrontend {
}
}
return 0;
+
+
}
catch (Throwable t) {
return handleError(t);
@@ -623,52 +637,65 @@ public class CliFrontend {
// Interaction with programs and JobManager
// --------------------------------------------------------------------------------------------
- protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
- LOG.info("Starting execution of program");
- JobSubmissionResult execResult;
+ protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+ JobSubmissionResult result;
try {
- execResult = client.run(program, parallelism, wait);
- }
- catch (ProgramInvocationException e) {
+ result = client.runDetached(program, parallelism);
+ } catch (ProgramInvocationException e) {
return handleError(e);
- }
- finally {
+ } finally {
program.deleteExtractedLibraries();
}
- if(wait) {
- LOG.info("Program execution finished");
- }
-
- // we come here after the job has finished (or the job has been submitted)
- if (execResult != null) {
+ if (result != null) {
// if the job has been submitted to a detached YARN cluster, there won't be any
// exec results, but the object will be set (for the job id)
if (yarnCluster != null && yarnCluster.isDetached()) {
- if(execResult.getJobID() == null) {
- throw new RuntimeException("Error while starting job. No Job ID set.");
- }
- yarnCluster.stopAfterJob(execResult.getJobID());
+
+ yarnCluster.stopAfterJob(result.getJobID());
yarnCluster.disconnect();
- if(!webFrontend) {
- System.out.println("The Job has been submitted with JobID "+execResult.getJobID());
+ if (!webFrontend) {
+ System.out.println("The Job has been submitted with JobID " + result.getJobID());
}
return 0;
- }
- if (execResult instanceof JobExecutionResult) {
- JobExecutionResult result = (JobExecutionResult) execResult;
- if(!webFrontend) {
- System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
- }
- Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
- if (accumulatorsResult.size() > 0 && !webFrontend) {
- System.out.println("Accumulator Results: ");
- System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
- }
} else {
- LOG.info("The Job did not return an execution result");
+ throw new RuntimeException("Error while starting job. No Job ID set.");
+ }
+ }
+
+ return 0;
+ }
+
+ protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
+ LOG.info("Starting execution of program");
+
+ JobExecutionResult result;
+ try {
+ client.setPrintStatusDuringExecution(true);
+ result = client.runBlocking(program, parallelism);
+ }
+ catch (ProgramInvocationException e) {
+ return handleError(e);
+ }
+ finally {
+ program.deleteExtractedLibraries();
+ }
+
+ LOG.info("Program execution finished");
+
+ if (result != null) {
+ if (!webFrontend) {
+ System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
+ }
+ Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
+ if (accumulatorsResult.size() > 0 && !webFrontend) {
+ System.out.println("Accumulator Results: ");
+ System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
}
+ } else {
+ LOG.info("The Job did not return an execution result");
}
+
return 0;
}
@@ -767,7 +794,6 @@ public class CliFrontend {
* Retrieves a {@link Client} object from the given command line options and other parameters.
*
* @param options Command line options which contain JobManager address
- * @param classLoader Class loader to use by the Client
* @param programName Program name
* @param userParallelism Given user parallelism
* @return
@@ -775,12 +801,10 @@ public class CliFrontend {
*/
protected Client getClient(
CommandLineOptions options,
- ClassLoader classLoader,
String programName,
int userParallelism)
throws Exception {
- InetSocketAddress jobManagerAddress = null;
-
+ InetSocketAddress jobManagerAddress;
int maxSlots = -1;
if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
@@ -796,14 +820,16 @@ public class CliFrontend {
// the number of slots available from YARN:
int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
- if(yarnTmSlots == -1) {
+ if (yarnTmSlots == -1) {
yarnTmSlots = 1;
}
maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
- if(userParallelism != -1) {
+ if (userParallelism != -1) {
int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
- logAndSysout("The YARN cluster has "+maxSlots+" slots available, but the user requested a parallelism of "+userParallelism+" on YARN. " +
- "Each of the "+flinkYarnClient.getTaskManagerCount()+" TaskManagers will get "+slotsPerTM+" slots.");
+ logAndSysout("The YARN cluster has " + maxSlots + " slots available, " +
+ "but the user requested a parallelism of " + userParallelism + " on YARN. " +
+ "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
+ "will get "+slotsPerTM+" slots.");
flinkYarnClient.setTaskManagerSlots(slotsPerTM);
}
@@ -811,11 +837,12 @@ public class CliFrontend {
yarnCluster = flinkYarnClient.deploy();
yarnCluster.connectToCluster();
}
- catch(Exception e) {
+ catch (Exception e) {
throw new RuntimeException("Error deploying the YARN cluster", e);
}
jobManagerAddress = yarnCluster.getJobManagerAddress();
+ writeJobManagerAddressToConfig(jobManagerAddress);
logAndSysout("YARN cluster started");
logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
@@ -847,14 +874,11 @@ public class CliFrontend {
else {
if(options.getJobManagerAddress() != null) {
jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
+ writeJobManagerAddressToConfig(jobManagerAddress);
}
}
- if(jobManagerAddress != null) {
- writeJobManagerAddressToConfig(jobManagerAddress);
- }
-
- return new Client(config, classLoader, maxSlots);
+ return new Client(config, maxSlots);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index cf08e0a..7928e53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -22,11 +22,14 @@ import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.common.Program;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.DataStatistics;
@@ -35,52 +38,66 @@ import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
/**
- * A class for executing a {@link Plan} on a local embedded Flink runtime instance.
+ * A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance.
+ *
+ * <p>By simply calling the {@link #executePlan(org.apache.flink.api.common.Plan)} method,
+ * this executor still start up and shut down again immediately after the program finished.</p>
+ *
+ * <p>To use this executor to execute many dataflow programs that constitute one job together,
+ * then this executor needs to be explicitly started, to keep running across several executions.</p>
*/
public class LocalExecutor extends PlanExecutor {
- private static boolean DEFAULT_OVERWRITE = false;
+ private static final boolean DEFAULT_OVERWRITE = false;
private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
- private final Object lock = new Object(); // we lock to ensure singleton execution
-
+ /** we lock to ensure singleton execution */
+ private final Object lock = new Object();
+
+ /** The mini cluster on which to execute the local programs */
private LocalFlinkMiniCluster flink;
+ /** Custom user configuration for the execution */
private Configuration configuration;
- // ---------------------------------- config options ------------------------------------------
-
+ /** Config value for how many slots to provide in the local cluster */
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+ /** Config flag whether to overwrite existing files by default */
private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
-
- // --------------------------------------------------------------------------------------------
-
+
+ // ------------------------------------------------------------------------
+
public LocalExecutor() {
- if (!ExecutionEnvironment.localExecutionIsAllowed()) {
- throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting a program through a client.");
- }
+ this(null);
}
public LocalExecutor(Configuration conf) {
- this();
- this.configuration = conf;
+ if (!ExecutionEnvironment.localExecutionIsAllowed()) {
+ throw new InvalidProgramException(
+ "The LocalEnvironment cannot be used when submitting a program through a client.");
+ }
+
+ this.configuration = conf != null ? conf : new Configuration();
}
+ // ------------------------------------------------------------------------
+ // Configuration
+ // ------------------------------------------------------------------------
-
public boolean isDefaultOverwriteFiles() {
return defaultOverwriteFiles;
}
-
+
public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) {
this.defaultOverwriteFiles = defaultOverwriteFiles;
}
-
+
public void setTaskManagerNumSlots(int taskManagerNumSlots) {
this.taskManagerNumSlots = taskManagerNumSlots;
}
@@ -88,51 +105,48 @@ public class LocalExecutor extends PlanExecutor {
public int getTaskManagerNumSlots() {
return this.taskManagerNumSlots;
}
-
- // --------------------------------------------------------------------------------------------
- public static Configuration createConfiguration(LocalExecutor le) {
- Configuration configuration = new Configuration();
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, le.getTaskManagerNumSlots());
- configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, le.isDefaultOverwriteFiles());
- return configuration;
- }
+ // --------------------------------------------------------------------------------------------
+ @Override
public void start() throws Exception {
- synchronized (this.lock) {
- if (this.flink == null) {
-
+ synchronized (lock) {
+ if (flink == null) {
// create the embedded runtime
- Configuration configuration = createConfiguration(this);
- if(this.configuration != null) {
+ Configuration configuration = createConfiguration();
+ if (this.configuration != null) {
configuration.addAll(this.configuration);
}
// start it up
- this.flink = new LocalFlinkMiniCluster(configuration, true);
+ flink = new LocalFlinkMiniCluster(configuration, true);
this.flink.start();
} else {
throw new IllegalStateException("The local executor was already started.");
}
}
}
-
- /**
- * Stop the local executor instance. You should not call executePlan after this.
- */
+
+ @Override
public void stop() throws Exception {
- synchronized (this.lock) {
- if (this.flink != null) {
- this.flink.stop();
- this.flink = null;
- } else {
- throw new IllegalStateException("The local executor was not started.");
+ synchronized (lock) {
+ if (flink != null) {
+ flink.stop();
+ flink = null;
}
}
}
+ @Override
+ public boolean isRunning() {
+ return flink != null;
+ }
+
/**
- * Execute the given plan on the local Nephele instance, wait for the job to
- * finish and return the runtime in milliseconds.
+ * Executes the given program on a local runtime and waits for the job to finish.
+ *
+ * <p>If the executor has not been started before, this starts the executor and shuts it down
+ * after the job finished. If the job runs in session mode, the executor is kept alive until
+ * no more references to the executor exist.</p>
*
* @param plan The plan of the program to execute.
* @return The net runtime of the program, in milliseconds.
@@ -145,15 +159,15 @@ public class LocalExecutor extends PlanExecutor {
if (plan == null) {
throw new IllegalArgumentException("The plan may not be null.");
}
-
+
synchronized (this.lock) {
-
+
// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;
- if (this.flink == null) {
- // we start a session just for us now
+
+ if (flink == null) {
shutDownAtEnd = true;
-
+
// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
@@ -161,9 +175,11 @@ public class LocalExecutor extends PlanExecutor {
this.taskManagerNumSlots = maxParallelism;
}
}
-
+
+ // start the cluster for us
start();
- } else {
+ }
+ else {
// we use the existing session
shutDownAtEnd = false;
}
@@ -173,10 +189,10 @@ public class LocalExecutor extends PlanExecutor {
Optimizer pc = new Optimizer(new DataStatistics(), configuration);
OptimizedPlan op = pc.compile(plan);
-
+
JobGraphGenerator jgg = new JobGraphGenerator(configuration);
- JobGraph jobGraph = jgg.compileJobGraph(op);
-
+ JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());
+
boolean sysoutPrint = isPrintingStatusDuringExecution();
return flink.submitJobAndWait(jobGraph, sysoutPrint);
}
@@ -189,32 +205,50 @@ public class LocalExecutor extends PlanExecutor {
}
/**
- * Returns a JSON dump of the optimized plan.
- *
- * @param plan
- * The program's plan.
- * @return JSON dump of the optimized plan.
- * @throws Exception
+ * Creates a JSON representation of the given dataflow's execution plan.
+ *
+ * @param plan The dataflow plan.
+ * @return The dataflow's execution plan, as a JSON string.
+ * @throws Exception Thrown, if the optimization process that creates the execution plan failed.
*/
@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
- Optimizer pc = new Optimizer(new DataStatistics(), createConfiguration(this));
+ final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
+
+ Optimizer pc = new Optimizer(new DataStatistics(), this.configuration);
+ pc.setDefaultParallelism(parallelism);
OptimizedPlan op = pc.compile(plan);
- PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
-
- return gen.getOptimizerPlanAsJSON(op);
+
+ return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
}
-
+
+ @Override
+ public void endSession(JobID jobID) throws Exception {
+ LocalFlinkMiniCluster flink = this.flink;
+ if (flink != null) {
+ ActorGateway leaderGateway = flink.getLeaderGateway(AkkaUtils.getDefaultTimeout());
+ leaderGateway.tell(new JobManagerMessages.RemoveCachedJob(jobID));
+ }
+ }
+
+ private Configuration createConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
+ configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
+ return configuration;
+ }
+
+
// --------------------------------------------------------------------------------------------
// Static variants that internally bring up an instance and shut it down after the execution
// --------------------------------------------------------------------------------------------
-
+
/**
- * Executes the program described by the given plan assembler.
+ * Executes the given program.
*
- * @param pa The program's plan assembler.
+ * @param pa The program.
* @param args The parameters.
- * @return The net runtime of the program, in milliseconds.
+ * @return The execution result of the program.
*
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
@@ -222,57 +256,45 @@ public class LocalExecutor extends PlanExecutor {
public static JobExecutionResult execute(Program pa, String... args) throws Exception {
return execute(pa.getPlan(args));
}
-
+
/**
- * Executes the program represented by the given Pact plan.
+ * Executes the given dataflow plan.
*
- * @param plan The program's plan.
- * @return The net runtime of the program, in milliseconds.
+ * @param plan The dataflow plan.
+ * @return The execution result.
*
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
public static JobExecutionResult execute(Plan plan) throws Exception {
- LocalExecutor exec = new LocalExecutor();
- try {
- exec.start();
- return exec.executePlan(plan);
- } finally {
- exec.stop();
- }
+ return new LocalExecutor().executePlan(plan);
}
/**
- * Returns a JSON dump of the optimized plan.
+ * Creates a JSON representation of the given dataflow's execution plan.
*
- * @param plan
- * The program's plan.
- * @return JSON dump of the optimized plan.
- * @throws Exception
+ * @param plan The dataflow plan.
+ * @return The dataflow's execution plan, as a JSON string.
+ * @throws Exception Thrown, if the optimization process that creates the execution plan failed.
*/
public static String optimizerPlanAsJSON(Plan plan) throws Exception {
- LocalExecutor exec = new LocalExecutor();
- try {
- exec.start();
- Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.configuration());
- OptimizedPlan op = pc.compile(plan);
- PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
-
- return gen.getOptimizerPlanAsJSON(op);
- } finally {
- exec.stop();
- }
+ final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism();
+
+ Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
+ pc.setDefaultParallelism(parallelism);
+ OptimizedPlan op = pc.compile(plan);
+
+ return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
}
/**
- * Return unoptimized plan as JSON.
+ * Creates a JSON representation of the given dataflow plan.
*
- * @param plan The program plan.
- * @return The plan as a JSON object.
+ * @param plan The dataflow plan.
+ * @return The dataflow plan (prior to optimization) as a JSON string.
*/
public static String getPlanAsJSON(Plan plan) {
- PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(plan);
- return gen.getPactPlanAsJSON(sinks);
+ return new PlanJSONDumpGenerator().getPactPlanAsJSON(sinks);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 20169f6..e8e9ade 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -18,7 +18,6 @@
package org.apache.flink.client;
-import java.io.File;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
@@ -26,36 +25,41 @@ import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program
* and ships it to a remote Flink cluster for execution.
*
- * The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
- * set of libraries that need to be shipped together with the program.
+ * <p>The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
+ * set of libraries that need to be shipped together with the program.</p>
*
- * The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
- * remotely execute program parts.
+ * <p>The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
+ * remotely execute program parts.</p>
*/
public class RemoteExecutor extends PlanExecutor {
+
+ private final Object lock = new Object();
- private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);
-
private final List<String> jarFiles;
private final Configuration clientConfiguration;
+
+ private Client client;
+
+ private int defaultParallelism = 1;
+
public RemoteExecutor(String hostname, int port) {
this(hostname, port, Collections.<String>emptyList(), new Configuration());
@@ -97,51 +101,148 @@ public class RemoteExecutor extends PlanExecutor {
clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
}
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Sets the parallelism that will be used when neither the program does not define
+ * any parallelism at all.
+ *
+ * @param defaultParallelism The default parallelism for the executor.
+ */
+ public void setDefaultParallelism(int defaultParallelism) {
+ if (defaultParallelism < 1) {
+ throw new IllegalArgumentException("The default parallelism must be at least one");
+ }
+ this.defaultParallelism = defaultParallelism;
+ }
+
+ /**
+ * Gets the parallelism that will be used when neither the program does not define
+ * any parallelism at all.
+ *
+ * @return The default parallelism for the executor.
+ */
+ public int getDefaultParallelism() {
+ return defaultParallelism;
+ }
+
+ // ------------------------------------------------------------------------
+ // Startup & Shutdown
+ // ------------------------------------------------------------------------
+
+
+ @Override
+ public void start() throws Exception {
+ synchronized (lock) {
+ if (client == null) {
+ client = new Client(clientConfiguration);
+ client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
+ }
+ else {
+ throw new IllegalStateException("The remote executor was already started.");
+ }
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ synchronized (lock) {
+ if (client != null) {
+ client.shutdown();
+ client = null;
+ }
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return client != null;
+ }
+
+ // ------------------------------------------------------------------------
+ // Executing programs
+ // ------------------------------------------------------------------------
+
@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
+ if (plan == null) {
+ throw new IllegalArgumentException("The plan may not be null.");
+ }
+
JobWithJars p = new JobWithJars(plan, this.jarFiles);
return executePlanWithJars(p);
}
-
- public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
- Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
- c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
-
- JobSubmissionResult result = c.run(p, -1, true);
- if (result instanceof JobExecutionResult) {
- return (JobExecutionResult) result;
- } else {
- LOG.warn("The Client didn't return a JobExecutionResult");
- return new JobExecutionResult(result.getJobID(), -1, null);
+
+ public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
+ if (program == null) {
+ throw new IllegalArgumentException("The job may not be null.");
}
- }
- public JobExecutionResult executeJar(String jarPath, String assemblerClass, String... args) throws Exception {
- File jarFile = new File(jarPath);
- PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
-
- Client c = new Client(clientConfiguration, program.getUserCodeClassLoader(), -1);
- c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
-
- JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
- if(result instanceof JobExecutionResult) {
- return (JobExecutionResult) result;
- } else {
- LOG.warn("The Client didn't return a JobExecutionResult");
- return new JobExecutionResult(result.getJobID(), -1, null);
+ synchronized (this.lock) {
+ // check if we start a session dedicated for this execution
+ final boolean shutDownAtEnd;
+
+ if (client == null) {
+ shutDownAtEnd = true;
+ // start the executor for us
+ start();
+ }
+ else {
+ // we use the existing session
+ shutDownAtEnd = false;
+ }
+
+ try {
+ return client.runBlocking(program, defaultParallelism);
+ }
+ finally {
+ if (shutDownAtEnd) {
+ stop();
+ }
+ }
}
}
@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
- JobWithJars p = new JobWithJars(plan, this.jarFiles);
- Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1);
-
- OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
- PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
- return jsonGen.getOptimizerPlanAsJSON(op);
+ Optimizer opt = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration());
+ OptimizedPlan optPlan = opt.compile(plan);
+ return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optPlan);
}
-
+
+ @Override
+ public void endSession(JobID jobID) throws Exception {
+ if (jobID == null) {
+ throw new NullPointerException("The supplied jobID must not be null.");
+ }
+
+ synchronized (this.lock) {
+ // check if we start a session dedicated for this execution
+ final boolean shutDownAtEnd;
+
+ if (client == null) {
+ shutDownAtEnd = true;
+ // start the executor for us
+ start();
+ }
+ else {
+ // we use the existing session
+ shutDownAtEnd = false;
+ }
+
+ try {
+ client.endSession(jobID);
+ }
+ finally {
+ if (shutDownAtEnd) {
+ stop();
+ }
+ }
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
@@ -168,5 +269,4 @@ public class RemoteExecutor extends PlanExecutor {
}
return new InetSocketAddress(host, port);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index e7464c8..6c886fe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -18,10 +18,9 @@
package org.apache.flink.client.program;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.PrintStream;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -32,8 +31,6 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
@@ -65,7 +62,6 @@ import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorSystem;
-import com.google.common.base.Preconditions;
/**
* Encapsulates the functionality necessary to submit a program to a remote cluster.
@@ -78,62 +74,139 @@ public class Client {
* The configuration to use for the client (optimizer, timeouts, ...) and to connect to the
* JobManager.
*/
- private final Configuration configuration;
-
/** The optimizer used in the optimization of batch programs */
- private final Optimizer compiler;
+ final Optimizer compiler;
+
+ /** The actor system used to communicate with the JobManager */
+ private final ActorSystem actorSystem;
- /** The class loader to use for classes from the user program (e.g., functions and data types) */
- private final ClassLoader userCodeClassLoader;
+ /** The actor reference to the JobManager */
+ private final ActorGateway jobManagerGateway;
+
+ /** The timeout for communication between the client and the JobManager */
+ private final FiniteDuration timeout;
+
+ /**
+ * If != -1, this field specifies the total number of available slots on the cluster
+ * connected to the client.
+ */
+ private final int maxSlots;
/** Flag indicating whether to sysout print execution updates */
private boolean printStatusDuringExecution = true;
/**
- * If != -1, this field specifies the total number of available slots on the cluster
- * connected to the client.
+ * For interactive invocations, the Job ID is only available after the ContextEnvironment has
+ * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
+ * which lets us access the last JobID here.
*/
- private int maxSlots;
+ private JobID lastJobID;
- /** ID of the last job submitted with this client. */
- private JobID lastJobId = null;
-
-
// ------------------------------------------------------------------------
// Construction
// ------------------------------------------------------------------------
/**
* Creates a instance that submits the programs to the JobManager defined in the
- * configuration. It sets the maximum number of slots to unknown (= -1).
+ * configuration. This method will try to resolve the JobManager hostname and throw an exception
+ * if that is not possible.
*
- * @param config The config used to obtain the JobManager's address.
- * @param userCodeClassLoader The class loader to use for loading user code classes.
+ * @param config The config used to obtain the job-manager's address, and used to configure the optimizer.
+ *
+ * @throws java.io.IOException Thrown, if the client's actor system could not be started.
+ * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.
*/
- public Client(Configuration config, ClassLoader userCodeClassLoader) {
- this(config, userCodeClassLoader, -1);
+ public Client(Configuration config) throws IOException {
+ this(config, -1);
}
/**
- * Creates a instance that submits the programs to the JobManager defined in the
- * configuration.
+ * Creates a new instance of the class that submits the jobs to a job-manager.
+ * at the given address using the default port.
*
- * @param config The config used to obtain the JobManager's address.
- * @param userCodeClassLoader The class loader to use for loading user code classes.
- * @param maxSlots The number of maxSlots on the cluster if != -1
+ * @param config The configuration for the client-side processes, like the optimizer.
+ * @param maxSlots maxSlots The number of maxSlots on the cluster if != -1.
+ *
+ * @throws java.io.IOException Thrown, if the client's actor system could not be started.
+ * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.
*/
- public Client(Configuration config, ClassLoader userCodeClassLoader, int maxSlots) {
- Preconditions.checkNotNull(config, "Configuration is null");
- Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
-
- this.configuration = config;
- this.userCodeClassLoader = userCodeClassLoader;
+ public Client(Configuration config, int maxSlots) throws IOException {
- this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
+ this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
this.maxSlots = maxSlots;
+
+ LOG.info("Starting client actor system");
+
+ try {
+ this.actorSystem = JobClient.startJobClientActorSystem(config);
+ } catch (Exception e) {
+ throw new IOException("Could start client actor system.", e);
+ }
+
+ // from here on, we need to make sure the actor system is shut down on error
+ boolean success = false;
+
+ try {
+
+ FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(config);
+ this.timeout = AkkaUtils.getTimeout(config);
+
+ LOG.info("Looking up JobManager");
+ LeaderRetrievalService leaderRetrievalService;
+
+ try {
+ leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+ } catch (Exception e) {
+ throw new IOException("Could not create the leader retrieval service.", e);
+ }
+
+ try {
+ this.jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
+ leaderRetrievalService,
+ actorSystem,
+ lookupTimeout);
+ } catch (LeaderRetrievalException e) {
+ throw new IOException("Failed to retrieve JobManager gateway", e);
+ }
+
+ LOG.info("Leading JobManager actor system address is " + this.jobManagerGateway.path());
+
+ LOG.info("JobManager runs at " + this.jobManagerGateway.path());
+
+ LOG.info("Communication between client and JobManager will have a timeout of " + this.timeout);
+ success = true;
+ } finally {
+ if (!success) {
+ try {
+ this.actorSystem.shutdown();
+
+ // wait at most for 30 seconds, to work around an occasional akka problem
+ actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS));
+ } catch (Throwable t) {
+ LOG.error("Shutting down actor system after error caused another error", t);
+ }
+ }
+ }
}
+ // ------------------------------------------------------------------------
+ // Startup & Shutdown
+ // ------------------------------------------------------------------------
/**
+ * Shuts down the client. This stops the internal actor system and actors.
+ */
+ public void shutdown() {
+ if (!this.actorSystem.isTerminated()) {
+ this.actorSystem.shutdown();
+ this.actorSystem.awaitTermination();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Configuration
+ // ------------------------------------------------------------------------
+
+ /**
* Configures whether the client should print progress updates during the execution to {@code System.out}.
* All updates are logged via the SLF4J loggers regardless of this setting.
*
@@ -159,118 +232,84 @@ public class Client {
}
// ------------------------------------------------------------------------
- // Compilation and Submission
+ // Access to the Program's Plan
// ------------------------------------------------------------------------
- public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
+ public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism)
+ throws CompilerException, ProgramInvocationException
+ {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
- return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(prog, parallelism));
+ return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism));
}
-
- public FlinkPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
+
+ public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism)
+ throws CompilerException, ProgramInvocationException
+ {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
- return getOptimizedPlan(prog.getPlanWithJars(), parallelism);
- }
- else if (prog.isUsingInteractiveMode()) {
+ return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
+ } else if (prog.isUsingInteractiveMode()) {
// temporary hack to support the optimizer plan preview
- OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(this.compiler);
+ OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler);
if (parallelism > 0) {
env.setParallelism(parallelism);
}
- env.setAsContext();
-
- // temporarily write syserr and sysout to a byte array.
- PrintStream originalOut = System.out;
- PrintStream originalErr = System.err;
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- System.setOut(new PrintStream(baos));
- ByteArrayOutputStream baes = new ByteArrayOutputStream();
- System.setErr(new PrintStream(baes));
- try {
- ContextEnvironment.enableLocalExecution(false);
- prog.invokeInteractiveModeForExecution();
- }
- catch (ProgramInvocationException e) {
- throw e;
- }
- catch (Throwable t) {
- // the invocation gets aborted with the preview plan
- if (env.optimizerPlan != null) {
- return env.optimizerPlan;
- } else {
- throw new ProgramInvocationException("The program caused an error: ", t);
- }
- }
- finally {
- ContextEnvironment.enableLocalExecution(true);
- System.setOut(originalOut);
- System.setErr(originalErr);
- System.err.println(baes);
- System.out.println(baos);
- }
-
- throw new ProgramInvocationException(
- "The program plan could not be fetched - the program aborted pre-maturely.\n"
- + "System.err: " + baes.toString() + '\n'
- + "System.out: " + baos.toString() + '\n');
- }
- else {
- throw new RuntimeException();
+
+ return env.getOptimizedPlan(prog);
+ } else {
+ throw new RuntimeException("Couldn't determine program mode.");
}
}
-
- public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
+
+ public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
- LOG.debug("Changing plan default parallelism from {} to {}",p.getDefaultParallelism(), parallelism);
+ LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
p.setDefaultParallelism(parallelism);
}
LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
- return this.compiler.compile(p);
- }
-
-
- /**
- * Creates the optimized plan for a given program, using this client's compiler.
- *
- * @param prog The program to be compiled.
- * @return The compiled and optimized plan, as returned by the compiler.
- * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
- * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
- */
- public FlinkPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
- return getOptimizedPlan(prog.getPlan(), parallelism);
- }
-
- public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
- return getJobGraph(optPlan, prog.getAllLibraries());
+ return compiler.compile(p);
}
-
- private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
- JobGraph job;
- if (optPlan instanceof StreamingPlan) {
- job = ((StreamingPlan) optPlan).getJobGraph();
- } else {
- JobGraphGenerator gen = new JobGraphGenerator(this.configuration);
- job = gen.compileJobGraph((OptimizedPlan) optPlan);
- }
- for (File jar : jarFiles) {
- job.addJar(new Path(jar.getAbsolutePath()));
+ // ------------------------------------------------------------------------
+ // Program submission / execution
+ // ------------------------------------------------------------------------
+
+ public JobExecutionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
+ Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+ if (prog.isUsingProgramEntryPoint()) {
+ return runBlocking(prog.getPlanWithJars(), parallelism);
}
+ else if (prog.isUsingInteractiveMode()) {
+ LOG.info("Starting program in interactive mode");
+ ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, true);
+ ContextEnvironment.enableLocalExecution(false);
- return job;
+ // invoke here
+ try {
+ prog.invokeInteractiveModeForExecution();
+ }
+ finally {
+ ContextEnvironment.enableLocalExecution(true);
+ }
+
+ return JobExecutionResult.fromJobSubmissionResult(new JobSubmissionResult(lastJobID));
+ }
+ else {
+ throw new RuntimeException();
+ }
}
- public JobSubmissionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
+ public JobSubmissionResult runDetached(PackagedProgram prog, int parallelism)
+ throws ProgramInvocationException
+ {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
- return run(prog.getPlanWithJars(), parallelism, wait);
+ return runDetached(prog.getPlanWithJars(), parallelism);
}
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
- ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, wait);
+ ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, false);
ContextEnvironment.enableLocalExecution(false);
// invoke here
@@ -281,113 +320,108 @@ public class Client {
ContextEnvironment.enableLocalExecution(true);
}
- // Job id has been set in the Client passed to the ContextEnvironment
- return new JobSubmissionResult(lastJobId);
+ return new JobSubmissionResult(lastJobID);
}
else {
- throw new RuntimeException();
+ throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");
}
}
-
- public JobSubmissionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
- return run(optimizedPlan, prog.getAllLibraries(), wait);
- }
-
/**
- * Runs a program on Flink cluster whose job-manager is configured in this client's configuration.
- * This method involves all steps, from compiling, job-graph generation to submission.
- *
- * @param prog The program to be executed.
+ * Runs a program on the Flink cluster to which this client is connected. The call blocks until the
+ * execution is complete, and returns afterwards.
+ *
+ * @param program The program to be executed.
* @param parallelism The default parallelism to use when running the program. The default parallelism is used
* when the program does not set a parallelism by itself.
- * @param wait A flag that indicates whether this function call should block until the program execution is done.
+ *
* @throws CompilerException Thrown, if the compiler encounters an illegal situation.
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
* or if the submission failed. That might be either due to an I/O problem,
* i.e. the job-manager is unreachable, or due to the fact that the
* parallel execution failed.
*/
- public JobSubmissionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
- return run((OptimizedPlan) getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
+ public JobExecutionResult runBlocking(JobWithJars program, int parallelism)
+ throws CompilerException, ProgramInvocationException
+ {
+ ClassLoader classLoader = program.getUserCodeClassLoader();
+ if (classLoader == null) {
+ throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
+ }
+
+ OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
+ return runBlocking(optPlan, program.getJarFiles(), classLoader);
+ }
+
+ /**
+ * Submits a program to the Flink cluster to which this client is connected. The call returns after the
+ * program was submitted and does not wait for the program to complete.
+ *
+ * @param program The program to be executed.
+ * @param parallelism The default parallelism to use when running the program. The default parallelism is used
+ * when the program does not set a parallelism by itself.
+ *
+ * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
+ * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
+ * or if the submission failed. That might be either due to an I/O problem,
+ * i.e. the job-manager is unreachable.
+ */
+ public JobSubmissionResult runDetached(JobWithJars program, int parallelism)
+ throws CompilerException, ProgramInvocationException
+ {
+ ClassLoader classLoader = program.getUserCodeClassLoader();
+ if (classLoader == null) {
+ throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
+ }
+
+ OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, program, parallelism);
+ return runDetached(optimizedPlan, program.getJarFiles(), classLoader);
}
- public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
+ public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
+ throws ProgramInvocationException
+ {
JobGraph job = getJobGraph(compiledPlan, libraries);
- this.lastJobId = job.getJobID();
- return run(job, wait);
+ return runBlocking(job, classLoader);
}
- public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
- this.lastJobId = jobGraph.getJobID();
-
- LOG.info("Starting client actor system");
- final ActorSystem actorSystem;
+ public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader)
+ throws ProgramInvocationException
+ {
+ JobGraph job = getJobGraph(compiledPlan, libraries);
+ return runDetached(job, classLoader);
+ }
+
+ public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+ LOG.info("Checking and uploading JAR files");
try {
- actorSystem = JobClient.startJobClientActorSystem(configuration);
+ JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
+ } catch (IOException e) {
+ throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
}
- catch (Exception e) {
- throw new ProgramInvocationException("Could start client actor system.", e);
+ try {
+ this.lastJobID = jobGraph.getJobID();
+ return JobClient.submitJobAndWait(actorSystem, jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, classLoader);
+ } catch (JobExecutionException e) {
+ throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
}
+ }
+ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+ LOG.info("Checking and uploading JAR files");
try {
- FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
- FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
-
- LOG.info("Looking up JobManager");
- ActorGateway jobManagerGateway;
-
- LeaderRetrievalService leaderRetrievalService;
-
- try {
- leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
- } catch (Exception e) {
- throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
- }
-
- try {
- jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
- leaderRetrievalService,
- actorSystem,
- lookupTimeout);
- } catch (LeaderRetrievalException e) {
- throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
- }
-
- LOG.info("Leading JobManager actor system address is " + jobManagerGateway.path());
-
- LOG.info("JobManager runs at " + jobManagerGateway.path());
-
- LOG.info("Communication between client and JobManager will have a timeout of " + timeout);
-
- LOG.info("Checking and uploading JAR files");
- try {
- JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
- } catch (IOException e) {
- throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
- }
-
- try {
- if (wait) {
- return JobClient.submitJobAndWait(actorSystem,
- jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, userCodeClassLoader);
- } else {
- JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, userCodeClassLoader);
- // return a dummy execution result with the JobId
- return new JobSubmissionResult(jobGraph.getJobID());
- }
- } catch (JobExecutionException e) {
+ JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout);
+ }
+ catch (IOException e) {
+ throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
+ }
+ try {
+ this.lastJobID = jobGraph.getJobID();
+ JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, classLoader);
+ return new JobSubmissionResult(jobGraph.getJobID());
+ } catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
- } catch (Exception e) {
- throw new ProgramInvocationException("Exception during program execution.", e);
- }
- } finally {
- // shut down started actor system
- actorSystem.shutdown();
-
- // wait at most for 30 seconds, to work around an occasional akka problem
- actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS));
}
}
@@ -397,62 +431,26 @@ public class Client {
* @throws Exception In case an error occurred.
*/
public void cancel(JobID jobId) throws Exception {
- final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
- final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-
- ActorSystem actorSystem;
+ Future<Object> response;
try {
- actorSystem = JobClient.startJobClientActorSystem(configuration);
+ response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
} catch (Exception e) {
- throw new ProgramInvocationException("Could start client actor system.", e);
+ throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
}
- try {
- ActorGateway jobManagerGateway;
-
- LeaderRetrievalService leaderRetrievalService;
+ Object result = Await.result(response, timeout);
- try {
- leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
- } catch (Exception e) {
- throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
- }
-
- try {
- jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
- leaderRetrievalService,
- actorSystem,
- lookupTimeout);
- } catch (LeaderRetrievalException e) {
- throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
- }
-
- Future<Object> response;
- try {
- response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
- } catch (Exception e) {
- throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
- }
-
- Object result = Await.result(response, timeout);
-
- if (result instanceof JobManagerMessages.CancellationSuccess) {
- LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
- } else if (result instanceof JobManagerMessages.CancellationFailure) {
- Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
- LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
- throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
- } else {
- throw new Exception("Unknown message received while cancelling.");
- }
- } finally {
- // shut down started actor system
- actorSystem.shutdown();
- actorSystem.awaitTermination();
+ if (result instanceof JobManagerMessages.CancellationSuccess) {
+ LOG.debug("Job cancellation with ID " + jobId + " succeeded.");
+ } else if (result instanceof JobManagerMessages.CancellationFailure) {
+ Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
+ LOG.debug("Job cancellation with ID " + jobId + " failed.", t);
+ throw new Exception("Failed to cancel the job because of \n" + t.getMessage());
+ } else {
+ throw new Exception("Unknown message received while cancelling.");
}
}
-
/**
* Requests and returns the accumulators for the given job identifier. Accumulators can be
* requested while a is running or after it has finished. The default class loader is used
@@ -473,117 +471,98 @@ public class Client {
*/
public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
- final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
- final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration);
-
- ActorSystem actorSystem;
+ Future<Object> response;
try {
- actorSystem = JobClient.startJobClientActorSystem(configuration);
+ response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
} catch (Exception e) {
- throw new Exception("Could start client actor system.", e);
+ throw new Exception("Failed to query the job manager gateway for accumulators.", e);
}
- try {
- ActorGateway jobManagerGateway;
-
- LeaderRetrievalService leaderRetrievalService;
-
- try {
- leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
- } catch (Exception e) {
- throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
- }
-
- try {
- jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
- leaderRetrievalService,
- actorSystem,
- lookupTimeout);
- } catch (LeaderRetrievalException e) {
- throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e);
- }
-
- Future<Object> response;
- try {
- response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
- } catch (Exception e) {
- throw new Exception("Failed to query the job manager gateway for accumulators.", e);
- }
-
- Object result = Await.result(response, timeout);
+ Object result = Await.result(response, timeout);
- if (result instanceof AccumulatorResultsFound) {
- Map<String, SerializedValue<Object>> serializedAccumulators =
- ((AccumulatorResultsFound) result).result();
+ if (result instanceof AccumulatorResultsFound) {
+ Map<String, SerializedValue<Object>> serializedAccumulators =
+ ((AccumulatorResultsFound) result).result();
- return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
+ return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
- } else if (result instanceof AccumulatorResultsErroneous) {
- throw ((AccumulatorResultsErroneous) result).cause();
- } else {
- throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
- }
- } finally {
- actorSystem.shutdown();
- actorSystem.awaitTermination();
+ } else if (result instanceof AccumulatorResultsErroneous) {
+ throw ((AccumulatorResultsErroneous) result).cause();
+ } else {
+ throw new Exception("Failed to fetch accumulators for the job " + jobID + ".");
}
}
- // --------------------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+ // Sessions
+ // ------------------------------------------------------------------------
- public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {
-
- private final Optimizer compiler;
-
- private FlinkPlan optimizerPlan;
-
-
- private OptimizerPlanEnvironment(Optimizer compiler) {
- this.compiler = compiler;
- }
-
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- Plan plan = createProgramPlan(jobName);
- this.optimizerPlan = compiler.compile(plan);
-
- // do not go on with anything now!
- throw new ProgramAbortException();
+ /**
+ * Tells the JobManager to finish the session (job) defined by the given ID.
+ *
+ * @param jobId The ID that identifies the session.
+ */
+ public void endSession(JobID jobId) throws Exception {
+ if (jobId == null) {
+ throw new IllegalArgumentException("The JobID must not be null.");
}
+ endSessions(Collections.singletonList(jobId));
+ }
- @Override
- public String getExecutionPlan() throws Exception {
- Plan plan = createProgramPlan(null, false);
- this.optimizerPlan = compiler.compile(plan);
-
- // do not go on with anything now!
- throw new ProgramAbortException();
- }
-
- private void setAsContext() {
- ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
-
- @Override
- public ExecutionEnvironment createExecutionEnvironment() {
- return OptimizerPlanEnvironment.this;
- }
- };
- initializeContextEnvironment(factory);
+ /**
+ * Tells the JobManager to finish the sessions (jobs) defined by the given IDs.
+ *
+ * @param jobIds The IDs that identify the sessions.
+ */
+ public void endSessions(List<JobID> jobIds) throws Exception {
+ if (jobIds == null) {
+ throw new IllegalArgumentException("The JobIDs must not be null");
}
- public void setPlan(FlinkPlan plan){
- this.optimizerPlan = plan;
+ for (JobID jid : jobIds) {
+ if (jid != null) {
+ LOG.info("Telling job manager to end the session {}.", jid);
+ jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid));
+ }
}
}
- // --------------------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+ // Internal translation methods
+ // ------------------------------------------------------------------------
/**
- * A special exception used to abort programs when the caller is only interested in the
- * program plan, rather than in the full execution.
+ * Creates the optimized plan for a given program, using this client's compiler.
+ *
+ * @param prog The program to be compiled.
+ * @return The compiled and optimized plan, as returned by the compiler.
+ * @throws CompilerException Thrown, if the compiler encounters an illegal situation.
+ * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
*/
- public static final class ProgramAbortException extends Error {
- private static final long serialVersionUID = 1L;
+ private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) throws CompilerException,
+ ProgramInvocationException {
+ return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
}
+
+ public static JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
+ return getJobGraph(optPlan, prog.getAllLibraries());
+ }
+
+ private static JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
+ JobGraph job;
+ if (optPlan instanceof StreamingPlan) {
+ job = ((StreamingPlan) optPlan).getJobGraph();
+ } else {
+ JobGraphGenerator gen = new JobGraphGenerator();
+ job = gen.compileJobGraph((OptimizedPlan) optPlan);
+ }
+
+ for (File jar : jarFiles) {
+ job.addJar(new Path(jar.getAbsolutePath()));
+ }
+
+ return job;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 9287017..ad14a06 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -39,15 +40,14 @@ public class ContextEnvironment extends ExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(ContextEnvironment.class);
private final Client client;
-
+
private final List<File> jarFilesToAttach;
-
+
private final ClassLoader userCodeClassLoader;
private final boolean wait;
-
-
-
+
+
public ContextEnvironment(Client remoteConnection, List<File> jarFiles, ClassLoader userCodeClassLoader, boolean wait) {
this.client = remoteConnection;
this.jarFilesToAttach = jarFiles;
@@ -60,27 +60,33 @@ public class ContextEnvironment extends ExecutionEnvironment {
Plan p = createProgramPlan(jobName);
JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.userCodeClassLoader);
- JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait);
- if(result instanceof JobExecutionResult) {
- this.lastJobExecutionResult = (JobExecutionResult) result;
- return (JobExecutionResult) result;
- } else {
- LOG.warn("The Client didn't return a JobExecutionResult");
- this.lastJobExecutionResult = new JobExecutionResult(result.getJobID(), -1, null);
+ if (wait) {
+ this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
+ return this.lastJobExecutionResult;
+ }
+ else {
+ JobSubmissionResult result = client.runDetached(toRun, getParallelism());
+ LOG.warn("Job was executed in detached mode, the results will be available on completion.");
+ this.lastJobExecutionResult = JobExecutionResult.fromJobSubmissionResult(result);
return this.lastJobExecutionResult;
}
}
@Override
public String getExecutionPlan() throws Exception {
- Plan p = createProgramPlan("unnamed job");
-
- OptimizedPlan op = (OptimizedPlan) this.client.getOptimizedPlan(p, getParallelism());
+ Plan plan = createProgramPlan("unnamed job");
+ OptimizedPlan op = Client.getOptimizedPlan(client.compiler, plan, getParallelism());
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
return gen.getOptimizerPlanAsJSON(op);
}
+ @Override
+ public void startNewSession() throws Exception {
+ client.endSession(jobID);
+ jobID = JobID.generate();
+ }
+
public boolean isWait() {
return wait;
}
@@ -104,7 +110,9 @@ public class ContextEnvironment extends ExecutionEnvironment {
static void setAsContext(Client client, List<File> jarFilesToAttach,
ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
{
- initializeContextEnvironment(new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait));
+ ContextEnvironmentFactory factory =
+ new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait);
+ initializeContextEnvironment(factory);
}
protected static void enableLocalExecution(boolean enabled) {
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index b86487f..9e84e2d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.client.program;
import java.io.File;
@@ -30,6 +29,10 @@ import java.util.List;
import org.apache.flink.api.common.Plan;
+/**
+ * A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files that contain
+ * the classes of the functions and libraries necessary for the execution.
+ */
public class JobWithJars {
private Plan plan;
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
new file mode 100644
index 0000000..c9c3b45
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+public class OptimizerPlanEnvironment extends ExecutionEnvironment {
+
+ private final Optimizer compiler;
+
+ private FlinkPlan optimizerPlan;
+
+ public OptimizerPlanEnvironment(Optimizer compiler) {
+ this.compiler = compiler;
+ }
+
+ // ------------------------------------------------------------------------
+ // Execution Environment methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ Plan plan = createProgramPlan(jobName);
+ this.optimizerPlan = compiler.compile(plan);
+
+ // do not go on with anything now!
+ throw new ProgramAbortException();
+ }
+
+ @Override
+ public String getExecutionPlan() throws Exception {
+ Plan plan = createProgramPlan(null, false);
+ this.optimizerPlan = compiler.compile(plan);
+
+ // do not go on with anything now!
+ throw new ProgramAbortException();
+ }
+
+ @Override
+ public void startNewSession() {
+ // do nothing
+ }
+
+ public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {
+ setAsContext();
+
+ // temporarily write syserr and sysout to a byte array.
+ PrintStream originalOut = System.out;
+ PrintStream originalErr = System.err;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(baos));
+ ByteArrayOutputStream baes = new ByteArrayOutputStream();
+ System.setErr(new PrintStream(baes));
+ try {
+ ContextEnvironment.enableLocalExecution(false);
+ prog.invokeInteractiveModeForExecution();
+ }
+ catch (ProgramInvocationException e) {
+ throw e;
+ }
+ catch (Throwable t) {
+ // the invocation gets aborted with the preview plan
+ if (optimizerPlan != null) {
+ return optimizerPlan;
+ } else {
+ throw new ProgramInvocationException("The program caused an error: ", t);
+ }
+ }
+ finally {
+ ContextEnvironment.enableLocalExecution(true);
+ System.setOut(originalOut);
+ System.setErr(originalErr);
+ System.err.println(baes);
+ System.out.println(baos);
+ }
+
+ throw new ProgramInvocationException(
+ "The program plan could not be fetched - the program aborted pre-maturely.\n"
+ + "System.err: " + baes.toString() + '\n'
+ + "System.out: " + baos.toString() + '\n');
+ }
+ // ------------------------------------------------------------------------
+
+ private void setAsContext() {
+ ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ return OptimizerPlanEnvironment.this;
+ }
+ };
+ initializeContextEnvironment(factory);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public void setPlan(FlinkPlan plan){
+ this.optimizerPlan = plan;
+ }
+
+ /**
+ * A special exception used to abort programs when the caller is only interested in the
+ * program plan, rather than in the full execution.
+ */
+ public static final class ProgramAbortException extends Error {
+ private static final long serialVersionUID = 1L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 10096da..091a959 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -40,12 +40,9 @@ import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
-import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
@@ -166,7 +163,7 @@ public class PackagedProgram {
}
}
- public PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException {
+ PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException {
this.jarFile = null;
this.args = args == null ? new String[0] : args;
@@ -685,51 +682,5 @@ public class PackagedProgram {
throw new ProgramInvocationException("Cannot access jar file" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
}
}
-
- // --------------------------------------------------------------------------------------------
-
- public static final class PreviewPlanEnvironment extends ExecutionEnvironment {
-
- private List<DataSinkNode> previewPlan;
- private Plan plan;
-
- private String preview = null;
-
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- this.plan = createProgramPlan(jobName);
- this.previewPlan = Optimizer.createPreOptimizedPlan((Plan) plan);
-
- // do not go on with anything now!
- throw new Client.ProgramAbortException();
- }
- @Override
- public String getExecutionPlan() throws Exception {
- Plan plan = createProgramPlan("unused");
- this.previewPlan = Optimizer.createPreOptimizedPlan(plan);
-
- // do not go on with anything now!
- throw new Client.ProgramAbortException();
- }
-
- public void setAsContext() {
- ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
- @Override
- public ExecutionEnvironment createExecutionEnvironment() {
- return PreviewPlanEnvironment.this;
- }
- };
- initializeContextEnvironment(factory);
- }
-
- public Plan getPlan() {
- return this.plan;
- }
-
- public void setPreview(String preview) {
- this.preview = preview;
- }
-
- }
}