You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:11:10 UTC

[flink] 14/19: Wired verything together

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df355d9c4a282ff93f8fbddacfc1c7df0f774ef3
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 13:49:02 2019 +0100

    Wired verything together
---
 .../java/org/apache/flink/client/ClientUtils.java  |   1 -
 .../org/apache/flink/client/cli/CliFrontend.java   | 102 ++-------------------
 .../flink/client/cli/CliFrontendRunTest.java       |   3 +-
 .../execution/DefaultExecutorServiceLoader.java    |   2 +-
 4 files changed, 10 insertions(+), 98 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5824832..f971982 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 258708a..8dfe306 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
@@ -45,11 +44,11 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -57,7 +56,6 @@ import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -215,7 +213,7 @@ public class CliFrontend {
 				program.getJobJarAndDependencies());
 
 		try {
-			runProgram(effectiveConfiguration, program);
+			execute(effectiveConfiguration, program);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
@@ -236,92 +234,6 @@ public class CliFrontend {
 		return executionParameters.applyToConfiguration(effectiveConfiguration);
 	}
 
-	private <ClusterID> void runProgram(
-			Configuration configuration,
-			PackagedProgram program) throws ProgramInvocationException, FlinkException {
-
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
-		checkNotNull(clusterClientFactory);
-
-		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
-
-		try {
-			final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
-			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
-			final ClusterClient<ClusterID> client;
-
-			// directly deploy the job if the cluster is started in job mode and detached
-			if (clusterId == null && executionParameters.getDetachedMode()) {
-				int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();
-
-				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
-
-				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
-				client = clusterDescriptor.deployJobCluster(
-					clusterSpecification,
-					jobGraph,
-					executionParameters.getDetachedMode());
-
-				logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
-
-				try {
-					client.close();
-				} catch (Exception e) {
-					LOG.info("Could not properly shut down the client.", e);
-				}
-			} else {
-				final Thread shutdownHook;
-				if (clusterId != null) {
-					client = clusterDescriptor.retrieve(clusterId);
-					shutdownHook = null;
-				} else {
-					// also in job mode we have to deploy a session cluster because the job
-					// might consist of multiple parts (e.g. when using collect)
-					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
-					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
-					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
-					// there's a race-condition here if cli is killed before shutdown hook is installed
-					if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) {
-						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
-					} else {
-						shutdownHook = null;
-					}
-				}
-
-				try {
-					int userParallelism = executionParameters.getParallelism();
-					LOG.debug("User parallelism is set to {}", userParallelism);
-
-					executeProgram(configuration, program);
-				} finally {
-					if (clusterId == null && !executionParameters.getDetachedMode()) {
-						// terminate the cluster only if we have started it before and if it's not detached
-						try {
-							client.shutDownCluster();
-						} catch (final Exception e) {
-							LOG.info("Could not properly terminate the Flink cluster.", e);
-						}
-						if (shutdownHook != null) {
-							// we do not need the hook anymore as we have just tried to shutdown the cluster.
-							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
-						}
-					}
-					try {
-						client.close();
-					} catch (Exception e) {
-						LOG.info("Could not properly shut down the client.", e);
-					}
-				}
-			}
-		} finally {
-			try {
-				clusterDescriptor.close();
-			} catch (Exception e) {
-				LOG.info("Could not properly close the cluster descriptor.", e);
-			}
-		}
-	}
-
 	/**
 	 * Executes the info action.
 	 *
@@ -751,12 +663,14 @@ public class CliFrontend {
 	//  Interaction with programs and JobManager
 	// --------------------------------------------------------------------------------------------
 
-	protected void executeProgram(
-			Configuration configuration,
-			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
+	protected void execute(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException, FlinkException {
+		checkNotNull(configuration);
+		checkNotNull(program);
+
 		logAndSysout("Starting execution of program");
 
-		JobSubmissionResult result = ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program);
+		final ExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader();
+		final JobSubmissionResult result = ClientUtils.executeProgram(executorServiceLoader, configuration, program);
 
 		if (result.isJobExecutionResult()) {
 			logAndSysout("Program execution finished");
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index a0d551b..50232ba 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.cli;
 
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -198,7 +197,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 		}
 
 		@Override
-		protected void executeProgram(Configuration configuration, PackagedProgram program, ClusterClient client) {
+		protected void execute(final Configuration configuration, final PackagedProgram program) {
 			final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 			assertEquals(isDetached, executionConfigAccessor.getDetachedMode());
 			assertEquals(expectedParallelism, executionConfigAccessor.getParallelism());
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index b627b71..297b17e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -32,9 +32,9 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * todo make it singleton
  * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses
  * Java service discovery to find the available {@link ExecutorFactory executor factories}.
+ * MAKE IT A SINGLETON.
  */
 public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {