You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 06:15:04 UTC

[flink] 15/16: [FLINK-XXXXX] Remove redundant CliFrontend.runProgram() method

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4adba90f5e627c7b3704f96fafc7ad98f12a3fcd
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 21:50:27 2019 +0100

    [FLINK-XXXXX] Remove redundant CliFrontend.runProgram() method
---
 .../org/apache/flink/client/cli/CliFrontend.java   | 91 +---------------------
 1 file changed, 1 insertion(+), 90 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index e8f8179..e1269a0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
@@ -49,7 +48,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -57,7 +55,6 @@ import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -214,7 +211,7 @@ public class CliFrontend {
 				getEffectiveConfiguration(commandLine, programOptions, jobJars);
 
 		try {
-			runProgram(effectiveConfiguration, program);
+			executeProgram(effectiveConfiguration, program);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
@@ -235,92 +232,6 @@ public class CliFrontend {
 		return executionParameters.applyToConfiguration(effectiveConfiguration);
 	}
 
-	private <ClusterID> void runProgram(
-			Configuration configuration,
-			PackagedProgram program) throws ProgramInvocationException, FlinkException {
-
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
-		checkNotNull(clusterClientFactory);
-
-		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
-
-		try {
-			final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
-			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
-			final ClusterClient<ClusterID> client;
-
-			// directly deploy the job if the cluster is started in job mode and detached
-			if (clusterId == null && executionParameters.getDetachedMode()) {
-				int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();
-
-				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
-
-				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
-				client = clusterDescriptor.deployJobCluster(
-					clusterSpecification,
-					jobGraph,
-					executionParameters.getDetachedMode());
-
-				logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
-
-				try {
-					client.close();
-				} catch (Exception e) {
-					LOG.info("Could not properly shut down the client.", e);
-				}
-			} else {
-				final Thread shutdownHook;
-				if (clusterId != null) {
-					client = clusterDescriptor.retrieve(clusterId);
-					shutdownHook = null;
-				} else {
-					// also in job mode we have to deploy a session cluster because the job
-					// might consist of multiple parts (e.g. when using collect)
-					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
-					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
-					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
-					// there's a race-condition here if cli is killed before shutdown hook is installed
-					if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) {
-						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
-					} else {
-						shutdownHook = null;
-					}
-				}
-
-				try {
-					int userParallelism = executionParameters.getParallelism();
-					LOG.debug("User parallelism is set to {}", userParallelism);
-
-					executeProgram(configuration, program);
-				} finally {
-					if (clusterId == null && !executionParameters.getDetachedMode()) {
-						// terminate the cluster only if we have started it before and if it's not detached
-						try {
-							client.shutDownCluster();
-						} catch (final Exception e) {
-							LOG.info("Could not properly terminate the Flink cluster.", e);
-						}
-						if (shutdownHook != null) {
-							// we do not need the hook anymore as we have just tried to shutdown the cluster.
-							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
-						}
-					}
-					try {
-						client.close();
-					} catch (Exception e) {
-						LOG.info("Could not properly shut down the client.", e);
-					}
-				}
-			}
-		} finally {
-			try {
-				clusterDescriptor.close();
-			} catch (Exception e) {
-				LOG.info("Could not properly close the cluster descriptor.", e);
-			}
-		}
-	}
-
 	/**
 	 * Executes the info action.
 	 *