You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 06:15:04 UTC
[flink] 15/16: [FLINK-XXXXX] Remove redundant
CliFrontend.runProgram() method
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4adba90f5e627c7b3704f96fafc7ad98f12a3fcd
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 21:50:27 2019 +0100
[FLINK-XXXXX] Remove redundant CliFrontend.runProgram() method
---
.../org/apache/flink/client/cli/CliFrontend.java | 91 +---------------------
1 file changed, 1 insertion(+), 90 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index e8f8179..e1269a0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
@@ -49,7 +48,6 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -57,7 +55,6 @@ import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.ShutdownHookUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
@@ -214,7 +211,7 @@ public class CliFrontend {
getEffectiveConfiguration(commandLine, programOptions, jobJars);
try {
- runProgram(effectiveConfiguration, program);
+ executeProgram(effectiveConfiguration, program);
} finally {
program.deleteExtractedLibraries();
}
@@ -235,92 +232,6 @@ public class CliFrontend {
return executionParameters.applyToConfiguration(effectiveConfiguration);
}
- private <ClusterID> void runProgram(
- Configuration configuration,
- PackagedProgram program) throws ProgramInvocationException, FlinkException {
-
- final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
- checkNotNull(clusterClientFactory);
-
- final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
-
- try {
- final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
- final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
- final ClusterClient<ClusterID> client;
-
- // directly deploy the job if the cluster is started in job mode and detached
- if (clusterId == null && executionParameters.getDetachedMode()) {
- int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();
-
- final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
-
- final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
- client = clusterDescriptor.deployJobCluster(
- clusterSpecification,
- jobGraph,
- executionParameters.getDetachedMode());
-
- logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
-
- try {
- client.close();
- } catch (Exception e) {
- LOG.info("Could not properly shut down the client.", e);
- }
- } else {
- final Thread shutdownHook;
- if (clusterId != null) {
- client = clusterDescriptor.retrieve(clusterId);
- shutdownHook = null;
- } else {
- // also in job mode we have to deploy a session cluster because the job
- // might consist of multiple parts (e.g. when using collect)
- final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
- client = clusterDescriptor.deploySessionCluster(clusterSpecification);
- // if not running in detached mode, add a shutdown hook to shut down cluster if client exits
- // there's a race-condition here if cli is killed before shutdown hook is installed
- if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) {
- shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
- } else {
- shutdownHook = null;
- }
- }
-
- try {
- int userParallelism = executionParameters.getParallelism();
- LOG.debug("User parallelism is set to {}", userParallelism);
-
- executeProgram(configuration, program);
- } finally {
- if (clusterId == null && !executionParameters.getDetachedMode()) {
- // terminate the cluster only if we have started it before and if it's not detached
- try {
- client.shutDownCluster();
- } catch (final Exception e) {
- LOG.info("Could not properly terminate the Flink cluster.", e);
- }
- if (shutdownHook != null) {
- // we do not need the hook anymore as we have just tried to shutdown the cluster.
- ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
- }
- }
- try {
- client.close();
- } catch (Exception e) {
- LOG.info("Could not properly shut down the client.", e);
- }
- }
- }
- } finally {
- try {
- clusterDescriptor.close();
- } catch (Exception e) {
- LOG.info("Could not properly close the cluster descriptor.", e);
- }
- }
- }
-
/**
* Executes the info action.
*