You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/17 14:45:06 UTC

[flink] 13/14: Wired verything together

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 179dbc1107cf91fe7bc36b3812434d2d82dfe01f
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 13:49:02 2019 +0100

    Wired verything together
---
 .../java/org/apache/flink/client/ClientUtils.java  |   1 -
 .../org/apache/flink/client/cli/CliFrontend.java   | 186 ++++++++++-----------
 .../flink/client/cli/CliFrontendRunTest.java       |   3 +-
 .../execution/DefaultExecutorServiceLoader.java    |   2 +-
 4 files changed, 95 insertions(+), 97 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5824832..f971982 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 258708a..89a8f92 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
@@ -45,11 +44,11 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -57,7 +56,6 @@ import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -215,7 +213,7 @@ public class CliFrontend {
 				program.getJobJarAndDependencies());
 
 		try {
-			runProgram(effectiveConfiguration, program);
+			execute(effectiveConfiguration, program);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
@@ -236,91 +234,91 @@ public class CliFrontend {
 		return executionParameters.applyToConfiguration(effectiveConfiguration);
 	}
 
-	private <ClusterID> void runProgram(
-			Configuration configuration,
-			PackagedProgram program) throws ProgramInvocationException, FlinkException {
-
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
-		checkNotNull(clusterClientFactory);
-
-		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
-
-		try {
-			final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
-			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
-			final ClusterClient<ClusterID> client;
-
-			// directly deploy the job if the cluster is started in job mode and detached
-			if (clusterId == null && executionParameters.getDetachedMode()) {
-				int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();
-
-				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
-
-				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
-				client = clusterDescriptor.deployJobCluster(
-					clusterSpecification,
-					jobGraph,
-					executionParameters.getDetachedMode());
-
-				logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
-
-				try {
-					client.close();
-				} catch (Exception e) {
-					LOG.info("Could not properly shut down the client.", e);
-				}
-			} else {
-				final Thread shutdownHook;
-				if (clusterId != null) {
-					client = clusterDescriptor.retrieve(clusterId);
-					shutdownHook = null;
-				} else {
-					// also in job mode we have to deploy a session cluster because the job
-					// might consist of multiple parts (e.g. when using collect)
-					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
-					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
-					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
-					// there's a race-condition here if cli is killed before shutdown hook is installed
-					if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) {
-						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
-					} else {
-						shutdownHook = null;
-					}
-				}
-
-				try {
-					int userParallelism = executionParameters.getParallelism();
-					LOG.debug("User parallelism is set to {}", userParallelism);
-
-					executeProgram(configuration, program);
-				} finally {
-					if (clusterId == null && !executionParameters.getDetachedMode()) {
-						// terminate the cluster only if we have started it before and if it's not detached
-						try {
-							client.shutDownCluster();
-						} catch (final Exception e) {
-							LOG.info("Could not properly terminate the Flink cluster.", e);
-						}
-						if (shutdownHook != null) {
-							// we do not need the hook anymore as we have just tried to shutdown the cluster.
-							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
-						}
-					}
-					try {
-						client.close();
-					} catch (Exception e) {
-						LOG.info("Could not properly shut down the client.", e);
-					}
-				}
-			}
-		} finally {
-			try {
-				clusterDescriptor.close();
-			} catch (Exception e) {
-				LOG.info("Could not properly close the cluster descriptor.", e);
-			}
-		}
-	}
+//	private <ClusterID> void runProgram(
+//			Configuration configuration,
+//			PackagedProgram program) throws ProgramInvocationException, FlinkException {
+//
+//		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
+//		checkNotNull(clusterClientFactory);
+//
+//		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
+//
+//		try {
+//			final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
+//			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
+//			final ClusterClient<ClusterID> client;
+//
+//			// directly deploy the job if the cluster is started in job mode and detached
+//			if (clusterId == null && executionParameters.getDetachedMode()) {
+//				int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();
+//
+//				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
+//
+//				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
+//				client = clusterDescriptor.deployJobCluster(
+//					clusterSpecification,
+//					jobGraph,
+//					executionParameters.getDetachedMode());
+//
+//				logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
+//
+//				try {
+//					client.close();
+//				} catch (Exception e) {
+//					LOG.info("Could not properly shut down the client.", e);
+//				}
+//			} else {
+//				final Thread shutdownHook;
+//				if (clusterId != null) {
+//					client = clusterDescriptor.retrieve(clusterId);
+//					shutdownHook = null;
+//				} else {
+//					// also in job mode we have to deploy a session cluster because the job
+//					// might consist of multiple parts (e.g. when using collect)
+//					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
+//					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
+//					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
+//					// there's a race-condition here if cli is killed before shutdown hook is installed
+//					if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) {
+//						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
+//					} else {
+//						shutdownHook = null;
+//					}
+//				}
+//
+//				try {
+//					int userParallelism = executionParameters.getParallelism();
+//					LOG.debug("User parallelism is set to {}", userParallelism);
+//
+//					executeProgram(configuration, program);
+//				} finally {
+//					if (clusterId == null && !executionParameters.getDetachedMode()) {
+//						// terminate the cluster only if we have started it before and if it's not detached
+//						try {
+//							client.shutDownCluster();
+//						} catch (final Exception e) {
+//							LOG.info("Could not properly terminate the Flink cluster.", e);
+//						}
+//						if (shutdownHook != null) {
+//							// we do not need the hook anymore as we have just tried to shutdown the cluster.
+//							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
+//						}
+//					}
+//					try {
+//						client.close();
+//					} catch (Exception e) {
+//						LOG.info("Could not properly shut down the client.", e);
+//					}
+//				}
+//			}
+//		} finally {
+//			try {
+//				clusterDescriptor.close();
+//			} catch (Exception e) {
+//				LOG.info("Could not properly close the cluster descriptor.", e);
+//			}
+//		}
+//	}
 
 	/**
 	 * Executes the info action.
@@ -751,12 +749,14 @@ public class CliFrontend {
 	//  Interaction with programs and JobManager
 	// --------------------------------------------------------------------------------------------
 
-	protected void executeProgram(
-			Configuration configuration,
-			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
+	protected void execute(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException, FlinkException {
+		checkNotNull(configuration);
+		checkNotNull(program);
+
 		logAndSysout("Starting execution of program");
 
-		JobSubmissionResult result = ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program);
+		final ExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader();
+		final JobSubmissionResult result = ClientUtils.executeProgram(executorServiceLoader, configuration, program);
 
 		if (result.isJobExecutionResult()) {
 			logAndSysout("Program execution finished");
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index a0d551b..50232ba 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.cli;
 
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -198,7 +197,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 		}
 
 		@Override
-		protected void executeProgram(Configuration configuration, PackagedProgram program, ClusterClient client) {
+		protected void execute(final Configuration configuration, final PackagedProgram program) {
 			final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 			assertEquals(isDetached, executionConfigAccessor.getDetachedMode());
 			assertEquals(expectedParallelism, executionConfigAccessor.getParallelism());
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index b627b71..297b17e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -32,9 +32,9 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * todo make it singleton
  * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses
  * Java service discovery to find the available {@link ExecutorFactory executor factories}.
+ * MAKE IT A SINGLETON.
  */
 public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {