You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/17 14:45:06 UTC
[flink] 13/14: Wired verything together
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 179dbc1107cf91fe7bc36b3812434d2d82dfe01f
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 13:49:02 2019 +0100
Wired verything together
---
.../java/org/apache/flink/client/ClientUtils.java | 1 -
.../org/apache/flink/client/cli/CliFrontend.java | 186 ++++++++++-----------
.../flink/client/cli/CliFrontendRunTest.java | 3 +-
.../execution/DefaultExecutorServiceLoader.java | 2 +-
4 files changed, 95 insertions(+), 97 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5824832..f971982 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 258708a..89a8f92 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
@@ -45,11 +44,11 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -57,7 +56,6 @@ import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.ShutdownHookUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
@@ -215,7 +213,7 @@ public class CliFrontend {
program.getJobJarAndDependencies());
try {
- runProgram(effectiveConfiguration, program);
+ execute(effectiveConfiguration, program);
} finally {
program.deleteExtractedLibraries();
}
@@ -236,91 +234,91 @@ public class CliFrontend {
return executionParameters.applyToConfiguration(effectiveConfiguration);
}
- private <ClusterID> void runProgram(
- Configuration configuration,
- PackagedProgram program) throws ProgramInvocationException, FlinkException {
-
- final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
- checkNotNull(clusterClientFactory);
-
- final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
-
- try {
- final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
- final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
- final ClusterClient<ClusterID> client;
-
- // directly deploy the job if the cluster is started in job mode and detached
- if (clusterId == null && executionParameters.getDetachedMode()) {
- int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();
-
- final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
-
- final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
- client = clusterDescriptor.deployJobCluster(
- clusterSpecification,
- jobGraph,
- executionParameters.getDetachedMode());
-
- logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
-
- try {
- client.close();
- } catch (Exception e) {
- LOG.info("Could not properly shut down the client.", e);
- }
- } else {
- final Thread shutdownHook;
- if (clusterId != null) {
- client = clusterDescriptor.retrieve(clusterId);
- shutdownHook = null;
- } else {
- // also in job mode we have to deploy a session cluster because the job
- // might consist of multiple parts (e.g. when using collect)
- final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
- client = clusterDescriptor.deploySessionCluster(clusterSpecification);
- // if not running in detached mode, add a shutdown hook to shut down cluster if client exits
- // there's a race-condition here if cli is killed before shutdown hook is installed
- if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) {
- shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
- } else {
- shutdownHook = null;
- }
- }
-
- try {
- int userParallelism = executionParameters.getParallelism();
- LOG.debug("User parallelism is set to {}", userParallelism);
-
- executeProgram(configuration, program);
- } finally {
- if (clusterId == null && !executionParameters.getDetachedMode()) {
- // terminate the cluster only if we have started it before and if it's not detached
- try {
- client.shutDownCluster();
- } catch (final Exception e) {
- LOG.info("Could not properly terminate the Flink cluster.", e);
- }
- if (shutdownHook != null) {
- // we do not need the hook anymore as we have just tried to shutdown the cluster.
- ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
- }
- }
- try {
- client.close();
- } catch (Exception e) {
- LOG.info("Could not properly shut down the client.", e);
- }
- }
- }
- } finally {
- try {
- clusterDescriptor.close();
- } catch (Exception e) {
- LOG.info("Could not properly close the cluster descriptor.", e);
- }
- }
- }
+// private <ClusterID> void runProgram(
+// Configuration configuration,
+// PackagedProgram program) throws ProgramInvocationException, FlinkException {
+//
+// final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
+// checkNotNull(clusterClientFactory);
+//
+// final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
+//
+// try {
+// final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
+// final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
+// final ClusterClient<ClusterID> client;
+//
+// // directly deploy the job if the cluster is started in job mode and detached
+// if (clusterId == null && executionParameters.getDetachedMode()) {
+// int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();
+//
+// final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
+//
+// final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
+// client = clusterDescriptor.deployJobCluster(
+// clusterSpecification,
+// jobGraph,
+// executionParameters.getDetachedMode());
+//
+// logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
+//
+// try {
+// client.close();
+// } catch (Exception e) {
+// LOG.info("Could not properly shut down the client.", e);
+// }
+// } else {
+// final Thread shutdownHook;
+// if (clusterId != null) {
+// client = clusterDescriptor.retrieve(clusterId);
+// shutdownHook = null;
+// } else {
+// // also in job mode we have to deploy a session cluster because the job
+// // might consist of multiple parts (e.g. when using collect)
+// final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
+// client = clusterDescriptor.deploySessionCluster(clusterSpecification);
+// // if not running in detached mode, add a shutdown hook to shut down cluster if client exits
+// // there's a race-condition here if cli is killed before shutdown hook is installed
+// if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) {
+// shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
+// } else {
+// shutdownHook = null;
+// }
+// }
+//
+// try {
+// int userParallelism = executionParameters.getParallelism();
+// LOG.debug("User parallelism is set to {}", userParallelism);
+//
+// executeProgram(configuration, program);
+// } finally {
+// if (clusterId == null && !executionParameters.getDetachedMode()) {
+// // terminate the cluster only if we have started it before and if it's not detached
+// try {
+// client.shutDownCluster();
+// } catch (final Exception e) {
+// LOG.info("Could not properly terminate the Flink cluster.", e);
+// }
+// if (shutdownHook != null) {
+// // we do not need the hook anymore as we have just tried to shutdown the cluster.
+// ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
+// }
+// }
+// try {
+// client.close();
+// } catch (Exception e) {
+// LOG.info("Could not properly shut down the client.", e);
+// }
+// }
+// }
+// } finally {
+// try {
+// clusterDescriptor.close();
+// } catch (Exception e) {
+// LOG.info("Could not properly close the cluster descriptor.", e);
+// }
+// }
+// }
/**
* Executes the info action.
@@ -751,12 +749,14 @@ public class CliFrontend {
// Interaction with programs and JobManager
// --------------------------------------------------------------------------------------------
- protected void executeProgram(
- Configuration configuration,
- PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
+ protected void execute(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException, FlinkException {
+ checkNotNull(configuration);
+ checkNotNull(program);
+
logAndSysout("Starting execution of program");
- JobSubmissionResult result = ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program);
+ final ExecutorServiceLoader executorServiceLoader = new DefaultExecutorServiceLoader();
+ final JobSubmissionResult result = ClientUtils.executeProgram(executorServiceLoader, configuration, program);
if (result.isJobExecutionResult()) {
logAndSysout("Program execution finished");
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index a0d551b..50232ba 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.cli;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -198,7 +197,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
}
@Override
- protected void executeProgram(Configuration configuration, PackagedProgram program, ClusterClient client) {
+ protected void execute(final Configuration configuration, final PackagedProgram program) {
final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
assertEquals(isDetached, executionConfigAccessor.getDetachedMode());
assertEquals(expectedParallelism, executionConfigAccessor.getParallelism());
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index b627b71..297b17e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -32,9 +32,9 @@ import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * todo make it singleton
* The default implementation of the {@link ExecutorServiceLoader}. This implementation uses
* Java service discovery to find the available {@link ExecutorFactory executor factories}.
+ * MAKE IT A SINGLETON.
*/
public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {